HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/lib/python3/dist-packages/twisted/test/__pycache__/test_unix.cpython-38.pyc
U


W[5;�@s*dZddlmZmZddlZddlZddlZddlZddlm	Z	m
Z
mZmZm
Z
ddlmZmZddlmZddlmZmZddlmZdd	lmZdd
lmZmZGdd�dej�ZGd
d�dej�ZGdd�dej �Z!Gdd�dej"�Z#Gdd�dej�Z$e	�%e
d��sde_&e	�'e
d��s&de$_&dS)zK
Tests for implementations of L{IReactorUNIX} and L{IReactorUNIXDatagram}.
�)�division�absolute_importN)�
interfaces�reactor�protocol�error�address)�defer�utils)�lockfile)�_PY3�
networkString)�FilePath)�unittest)�MyServerFactory�MyClientFactoryc@seZdZdd�Zdd�ZdS)�FailedConnectionClientFactorycCs
||_dS�N)�onFail)�selfr�r�8/usr/lib/python3/dist-packages/twisted/test/test_unix.py�__init__sz&FailedConnectionClientFactory.__init__cCs|j�|�dSr)rZerrback)rZ	connector�reasonrrr�clientConnectionFailedsz4FailedConnectionClientFactory.clientConnectionFailedN)�__name__�
__module__�__qualname__rrrrrrrsrc@sjeZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dd�Z
dd�Zdd�Ze
r^de_dd�ZdS)�UnixSocketTestsz
    Test unix sockets.
    cs����}����t��t��}�_t�|��}��|j�t	�	t	j
t	j�}��|j�|�
��|�|����fdd�}|�|�|S)z�
        The address passed to the server factory's C{buildProtocol} method and
        the address returned by the connected protocol's transport's C{getPeer}
        method match the address the client socket is bound to.
        cs0t���}���j|g���|j��|�dSr)r�UNIXAddress�assertEqual�
peerAddresses�	transportZgetPeer)�protoZexpected�Zpeernamer�
serverFactoryrr�
cbConnMade6s
z1UnixSocketTests.test_peerBind.<locals>.cbConnMade)�mktemprr	�Deferred�protocolConnectionMader�
listenUNIX�
addCleanup�
stopListening�socketZAF_UNIXZSOCK_STREAM�closeZbindZconnect�addCallback)r�filenameZconnMade�unixPortZ
unixSocketr&rr$r�
test_peerBind&s


zUnixSocketTests.test_peerBindcs�����t�}t��}||_t��|�}��|j�t	��t��}|�_t�
���t�||g�}���fdd�}|�|�|S)z�
        L{IReactorUNIX.connectUNIX} can be used to connect a client to a server
        started with L{IReactorUNIX.listenUNIX}.
        cs6|\}}���jt���g�|j��|j��dSr)r r!rrr"�loseConnection)�args�serverProtocolZclientProtocol��
clientFactoryr0rrr�allConnectedNs
�
z1UnixSocketTests.test_dumber.<locals>.allConnected)
r'rr	r(r)rr*r+r,r�connectUNIX�
gatherResultsr/)rr%�serverConnMader1�clientConnMade�dr8rr6r�test_dumber>s	
zUnixSocketTests.test_dumbercs�����t�}t��}||_tj�|dd����t�	�d��t
��t��}|�_tj��dd�t�||g�}����fdd�}|�
|���fdd	�}|�
|�|S)
z�
        A lockfile is created and locked when L{IReactorUNIX.listenUNIX} is
        called and released when the Deferred returned by the L{IListeningPort}
        provider's C{stopListening} method is called back.
        T�ZwantPID�.lock��ZcheckPIDcs:|\}}���jt���g�|j��|j�����Sr)r r!rrr"r3r,)r4r5ZclientProto�r7r0rr1rr�
_portStuffqs
�

z0UnixSocketTests.test_pidFile.<locals>._portStuffcs��t��d�d�dS)Nr@�locked)ZassertFalser�isLocked�Zignored)r0rrr�_check~sz,UnixSocketTests.test_pidFile.<locals>._check)r'rr	r(r)rr*Z
assertTruerrFrr9r:r/)rr%r;r<r=rDrHrrCr�test_pidFile[s 

zUnixSocketTests.test_pidFilecsR|���t��tj��dd�}|jtjtj��dd���fdd�}|���|�S)z�
        L{IReactorUNIX.listenUNIX} raises L{error.CannotListenError} if passed
        the name of a file on which a server is already listening.
        Tr?cstj��dd�}|��S�NTr?)rr*r,)�ignr1�r0r%rr�stoppedListening�sz<UnixSocketTests.test_socketLocking.<locals>.stoppedListening)	r'rrr*�assertRaisesr�CannotListenErrorr,r/)rr1rMrrLr�test_socketLocking�s�z"UnixSocketTests.test_socketLockingcCsj|��|_td|jf�}dttj�tj���	�ji}ttj
��	�j}tj|dd|f|d�}|�
|�|S)Nzmfrom twisted.internet import protocol, reactor
reactor.listenUNIX(%r, protocol.ServerFactory(),wantPID=True)
s
PYTHONPATHs-us-c)�env)r'r0r
r�os�pathsep�join�sys�pathZasBytesMode�
executabler
ZgetProcessValuer/)r�callback�sourcerQZpyExer=rrr�_uncleanSocketTest�s
��
z"UnixSocketTests._uncleanSocketTestcs�fdd�}��|�S)a"
        If passed C{True} for the C{wantPID} parameter, a server can be started
        listening with L{IReactorUNIX.listenUNIX} when passed the name of a
        file on which a previous server which has not exited cleanly has been
        listening using the C{wantPID} option.
        cstj�jt�dd�}|��SrJ)rr*r0rr,)rK�p�rrr�ranStupidChild�szGUnixSocketTests.test_uncleanServerSocketLocking.<locals>.ranStupidChild�rZ�rr]rr\r�test_uncleanServerSocketLocking�sz/UnixSocketTests.test_uncleanServerSocketLockingcs�fdd�}��|�S)z�
        If passed C{True} for the C{checkPID} parameter, a client connection
        attempt made with L{IReactorUNIX.connectUNIX} fails with
        L{error.BadFileError}.
        cs0t��}t|�}tj�j|dd���|tj�S)NTrB)	r	r(rrr9r0Z
assertFailurerZBadFileError)rKr=�fr\rrr]�szCUnixSocketTests.test_connectToUncleanServer.<locals>.ranStupidChildr^r_rr\r�test_connectToUncleanServer�sz+UnixSocketTests.test_connectToUncleanServercsj���}t�||��d�|f}��t��|���t��|�t��j�}���fdd�}|�	|�|S)z~
        Test the C{__str__} and C{__repr__} implementations of a UNIX port when
        used with the given factory.
        �
<%s on %r>cs.d�f}��t��|���t��|�dS�Nz<%s (not listening)>�r �repr�str�rKZunconnectedString��factoryNamerr1rrrM�s
z3UnixSocketTests._reprTest.<locals>.stoppedListening)
r'rr*r rfrgr	�
maybeDeferredr,r/)rr%rjr0�connectedStringr=rMrrir�	_reprTest�s
zUnixSocketTests._reprTestcCs*Gdd�d�}|�|tj�|�|�d�S)a
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIX.listenUNIX} contains the name of the classic factory
        class being used and the filename on which the port is listening or
        indicates that the port is not listening.
        c@seZdZdd�Zdd�ZdS)zCUnixSocketTests.test_reprWithClassicFactory.<locals>.ClassicFactorycSsdSrrr\rrr�doStart�szKUnixSocketTests.test_reprWithClassicFactory.<locals>.ClassicFactory.doStartcSsdSrrr\rrr�doStop�szJUnixSocketTests.test_reprWithClassicFactory.<locals>.ClassicFactory.doStopN�rrrrnrorrrr�ClassicFactory�srqz%twisted.test.test_unix.ClassicFactory��assertIsInstance�typesZ	ClassTyperm)rrqrrr�test_reprWithClassicFactory�s�z+UnixSocketTests.test_reprWithClassicFactory�)Classic classes do not exist on Python 3.cCs*Gdd�dt�}|�|t�|�|�d�S)a!
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIX.listenUNIX} contains the name of the new-style factory
        class being used and the filename on which the port is listening or
        indicates that the port is not listening.
        c@seZdZdd�Zdd�ZdS)zEUnixSocketTests.test_reprWithNewStyleFactory.<locals>.NewStyleFactorycSsdSrrr\rrrrn�szMUnixSocketTests.test_reprWithNewStyleFactory.<locals>.NewStyleFactory.doStartcSsdSrrr\rrrro�szLUnixSocketTests.test_reprWithNewStyleFactory.<locals>.NewStyleFactory.doStopNrprrrr�NewStyleFactory�srwz&twisted.test.test_unix.NewStyleFactory��objectrs�typerm)rrwrrr�test_reprWithNewStyleFactory�s�z,UnixSocketTests.test_reprWithNewStyleFactoryN)rrr�__doc__r2r>rIrPrZr`rbrmrur�skipr{rrrrr"s)�rc@s8eZdZdZZdZdd�Zdd�Zdd�Zd	d
�Z	dS)�ClientProtoFNcCst��|_t��|_dSr)r	r(�deferredStarted�deferredGotBackr\rrrrs
zClientProto.__init__cCs
d|_dS�NT��stoppedr\rrr�stopProtocolszClientProto.stopProtocolcCsd|_|j�d�dSr���startedrrXr\rrr�
startProtocolszClientProto.startProtocolcCs||_|j�d�dSr)�gotbackr�rX)r�datarrr�datagramReceivedszClientProto.datagramReceived)
rrrr�r�r�rr�r�r�rrrrr~sr~c@s<eZdZdZZdZZdd�Zdd�Zdd�Z	d	d
�Z
dS)�ServerProtoFNcCst��|_t��|_dSr)r	r(r�deferredGotWhatr\rrrr s
zServerProto.__init__cCs
d|_dSr�r�r\rrrr�$szServerProto.stopProtocolcCsd|_|j�d�dSr�r�r\rrrr�'szServerProto.startProtocolcCs*||_|j�d|�||_|j�d�dS)N�hi back)�gotfromr"�write�gotwhatr�rX)rr��addrrrrr�+szServerProto.datagramReceived)rrrr�r�r�r�rr�r�r�rrrrr�sr�c@sBeZdZdZdd�Zdd�Zdd�Zdd	�Zer6d
e_	dd�Z
d
S)�DatagramUnixSocketTestsz%
    Test datagram UNIX sockets.
    cs��������}t��t��t�|��}��|j�tj|��d�}��|j�t�	�j
�j
g�}��fdd�}����fdd�}|�|�|�|�|S)zf
        Test that a datagram can be sent to and received by a server and vice
        versa.
        )ZbindAddresscs�j�d�t��j�jg�S)N�hi)r"r�r	r:r�r�rG)�cp�sprrr�Fs�z4DatagramUnixSocketTests.test_exchange.<locals>.writecs.��d�j�����j���d�j�dS)Nr�r�)r r�r�r�rG�Z
clientaddrr�rr�rr�_cbTestExchangeKsz>DatagramUnixSocketTests.test_exchange.<locals>._cbTestExchange)r'r�r~r�listenUNIXDatagramr+r,ZconnectUNIXDatagramr	r:rr/)rZ
serveraddr�s�cr=r�r�rr�r�
test_exchange7s

z%DatagramUnixSocketTests.test_exchangecCsD|��}t�}t�||�}|�tjtj||�|��t�	|�dS)z�
        L{IReactorUNIXDatagram.listenUNIXDatagram} raises
        L{error.CannotListenError} if the unix socket specified is already in
        use.
        N)
r'r�rr�rNrrOr,rR�unlink)rr�r[r�rrr�test_cannotListenUs�z)DatagramUnixSocketTests.test_cannotListencsj���}t�||��d�|f}��t��|���t��|�t��j�}���fdd�}|�	|�|S)z�
        Test the C{__str__} and C{__repr__} implementations of a UNIX datagram
        port when used with the given protocol.
        rccs.d�f}��t��|���t��|�dSrdrerh��protocolNamerr1rrrMrs
z;DatagramUnixSocketTests._reprTest.<locals>.stoppedListening)
r'rr�r rfrgr	rkr,r/)rZserverProtor�r0rlZstopDeferredrMrr�rrmes
z!DatagramUnixSocketTests._reprTestcCs*Gdd�d�}|�|tj�|�|�d�S)a0
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
        classic protocol class being used and the filename on which the port is
        listening or indicates that the port is not listening.
        c@seZdZdd�Zdd�ZdS)zMDatagramUnixSocketTests.test_reprWithClassicProtocol.<locals>.ClassicProtocolcSsdSrr�rr"rrr�makeConnection�sz\DatagramUnixSocketTests.test_reprWithClassicProtocol.<locals>.ClassicProtocol.makeConnectioncSsdSrrr\rrrro�szTDatagramUnixSocketTests.test_reprWithClassicProtocol.<locals>.ClassicProtocol.doStopN�rrrr�rorrrr�ClassicProtocol�sr�z&twisted.test.test_unix.ClassicProtocolrr)rr�rrr�test_reprWithClassicProtocolzs�z4DatagramUnixSocketTests.test_reprWithClassicProtocolrvcCs*Gdd�dt�}|�|t�|�|�d�S)a2
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
        new-style protocol class being used and the filename on which the port
        is listening or indicates that the port is not listening.
        c@seZdZdd�Zdd�ZdS)zODatagramUnixSocketTests.test_reprWithNewStyleProtocol.<locals>.NewStyleProtocolcSsdSrrr�rrrr��sz^DatagramUnixSocketTests.test_reprWithNewStyleProtocol.<locals>.NewStyleProtocol.makeConnectioncSsdSrrr\rrrro�szVDatagramUnixSocketTests.test_reprWithNewStyleProtocol.<locals>.NewStyleProtocol.doStopNr�rrrr�NewStyleProtocol�sr�z'twisted.test.test_unix.NewStyleProtocolrx)rr�rrr�test_reprWithNewStyleProtocol�s�z5DatagramUnixSocketTests.test_reprWithNewStyleProtocolN)rrrr|r�r�rmr�rr}r�rrrrr�3s�r�z1This reactor does not support UNIX domain socketsz3This reactor does not support UNIX datagram sockets)(r|Z
__future__rrrRrUrtr-Ztwisted.internetrrrrrr	r
Ztwisted.pythonrZtwisted.python.compatrr
Ztwisted.python.filepathrZ
twisted.trialrZtwisted.test.test_tcprrZ
ClientFactoryrZTestCaserZConnectedDatagramProtocolr~ZDatagramProtocolr�r�ZIReactorUNIXr}ZIReactorUNIXDatagramrrrr�<module>s,	fv