File: //usr/lib/python3/dist-packages/requests_unixsocket/__pycache__/testutils.cpython-38.pyc
U
��V] � @ st d Z ddlZddlZddlZddlZddlZddlZe�e�Z G dd� dej
�ZG dd� d�ZG dd� dej
�Z
dS ) a
Utilities helpful for writing tests
Provides a UnixSocketServerThread that creates a running server, listening on a
newly created unix socket.
Example usage:
.. code-block:: python
def test_unix_domain_adapter_monkeypatch():
with UnixSocketServerThread() as usock_thread:
with requests_unixsocket.monkeypatch('http+unix://'):
urlencoded_usock = quote_plus(usock_process.usock)
url = 'http+unix://%s/path/to/page' % urlencoded_usock
r = requests.get(url)
� Nc s$ e Zd Z� fdd�Zdd� Z� ZS )�
KillThreadc s t t| �j||� || _d S �N)�superr �__init__�server)�selfr �args�kwargs�� __class__� �?/usr/lib/python3/dist-packages/requests_unixsocket/testutils.pyr s zKillThread.__init__c C s$ t �d� t�d� | jj�� d S )N� ZSleeping)�time�sleep�logger�debugr �_map�clear�r r r r
�run# s
zKillThread.run)�__name__�
__module__�__qualname__r r �
__classcell__r r r
r
r s r c @ s e Zd ZdZdd� ZdS )�WSGIAppNc C sh t �d|d � t �d|� d}dd|d fd|d fd
|d fg}d}|||� t �d|||� |gS )
Nz WSGIApp.__call__: Invoked for %sZ PATH_INFOzWSGIApp.__call__: environ = %rz200 OK)zX-Transportzunix domain socketz
X-Socket-PathZSERVER_PORTzX-Requested-Query-StringZQUERY_STRINGzX-Requested-Paths Hello world!zZWSGIApp.__call__: Responding with status_text = %r; response_headers = %r; body_bytes = %r)r r )r �environZstart_responseZstatus_textZresponse_headersZ
body_bytesr r r
�__call__, s"