HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/lib/python3/dist-packages/requests_unixsocket/__pycache__/testutils.cpython-38.pyc
U

��V]�@stdZddlZddlZddlZddlZddlZddlZe�e�Z	Gdd�dej
�ZGdd�d�ZGdd�dej
�Z
dS)	a
Utilities helpful for writing tests

Provides a UnixSocketServerThread that creates a running server, listening on a
newly created unix socket.

Example usage:

.. code-block:: python

    def test_unix_domain_adapter_monkeypatch():
        with UnixSocketServerThread() as usock_thread:
            with requests_unixsocket.monkeypatch('http+unix://'):
                urlencoded_usock = quote_plus(usock_process.usock)
                url = 'http+unix://%s/path/to/page' % urlencoded_usock
                r = requests.get(url)
�Ncs$eZdZ�fdd�Zdd�Z�ZS)�
KillThreadcstt|�j||�||_dS�N)�superr�__init__�server)�selfr�args�kwargs��	__class__��?/usr/lib/python3/dist-packages/requests_unixsocket/testutils.pyrszKillThread.__init__cCs$t�d�t�d�|jj��dS)N�ZSleeping)�time�sleep�logger�debugr�_map�clear�rrrr
�run#s

zKillThread.run)�__name__�
__module__�__qualname__rr�
__classcell__rrr
r
rsrc@seZdZdZdd�ZdS)�WSGIAppNcCsht�d|d�t�d|�d}dd|dfd|d	fd
|dfg}d}|||�t�d|||�|gS)
Nz WSGIApp.__call__: Invoked for %sZ	PATH_INFOzWSGIApp.__call__: environ = %rz200 OK)zX-Transportzunix domain socketz
X-Socket-PathZSERVER_PORTzX-Requested-Query-StringZQUERY_STRINGzX-Requested-PathsHello world!zZWSGIApp.__call__: Responding with status_text = %r; response_headers = %r; body_bytes = %r)rr)r�environZstart_responseZstatus_textZresponse_headersZ
body_bytesrrr
�__call__,s"


�
�zWSGIApp.__call__)rrrrrrrrr
r)srcs<eZdZ�fdd�Zdd�Zdd�Zdd�Zd	d
�Z�ZS)�UnixSocketServerThreadcs0tt|�j||�|��|_d|_t��|_dSr)	rrr�get_tempfile_name�usockr�	threadingZEvent�server_ready_event)rrr	r
rr
rAs
zUnixSocketServerThread.__init__cCs,t�t�jt��t��jdd�f}d|S)Ni����z/tmp/test_requests.%s_%s_%s)�os�stat�__file__�st_ino�getpid�uuidZuuid4�hex�rrrrr
rGs$z(UnixSocketServerThread.get_tempfile_namecCsDt�d|�t�}tj||jd�}||_||_|j��|�	�dS)NzCall waitress.serve in %r ...)Zunix_socket)
rrr�waitressZ
create_serverr rr"�setr)rZwsgi_apprrrr
rNs
zUnixSocketServerThread.runcCs0t�d|�|��t�d|�|j��|S)NzStarting %r ...zStarted %r.)rr�startr"�waitrrrr
�	__enter__Ws

z UnixSocketServerThread.__enter__cGs"|j��|jrt|j���dSr)r"r.rrr-r*rrr
�__exit__^s
zUnixSocketServerThread.__exit__)	rrrrrrr/r0rrrr
r
r@s
	r)�__doc__Zloggingr#r!rr(r+Z	getLoggerrrZThreadrrrrrrr
�<module>s