HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //lib/python3/dist-packages/twisted/test/__pycache__/test_iutils.cpython-38.pyc
U


W[�4�@s�dZddlmZmZddlZddlZddlZddlZddlZddl	m
Z
ddlmZddl
mZddlmZmZmZmZddlmZdd	lmZGd
d�dej�ZGdd
�d
ej�ZGdd�de�ZdS)zD
Test running processes with the APIs in L{twisted.internet.utils}.
�)�division�absolute_importN)�_PY3)�platform)�unittest)�error�reactor�utils�
interfaces)�Deferred)�SuppressedWarningsTestsc@s�eZdZdZe�ed�dkr dZdZdZ	e
jZdd�Z
dd�Zdd	�Zd
d�Zdd
�Zdd�Zdd�Ze��rtde_dd�Zdd�Zdd�Zdd�Zdd�Zdd�Zdd �Zd!d"�ZdS)#�ProcessUtilsTestszt
    Test running a process using L{getProcessOutput}, L{getProcessValue}, and
    L{getProcessOutputAndValue}.
    Nz)reactor doesn't implement IReactorProcessc	CsB|��}t|d��}|�tj�|�tj�W5QRXtj�|�S)zj
        Write the given list of lines to a text file and return the absolute
        path to it.
        Zwt)�mktemp�open�write�os�linesep�join�path�abspath)�selfZsourceLinesZscript�
scriptFile�r�:/usr/lib/python3/dist-packages/twisted/test/test_iutils.py�makeSourceFile!s"z ProcessUtilsTests.makeSourceFilecCs>|�ddddddddd	d
g
�}t�|jd|g�}|�|jd�S)
z�
        L{getProcessOutput} returns a L{Deferred} which fires with the complete
        output of the process it runs after that process exits.
        �
import syszfor s in b'hello world\n':z%    if hasattr(sys.stdout, 'buffer'):z        # Python 3z        s = bytes([s])z"        sys.stdout.buffer.write(s)z	    else:z        # Python 2z        sys.stdout.write(s)z    sys.stdout.flush()�-ushello world
�rr	�getProcessOutput�exe�addCallback�assertEqual�rr�drrr�test_output,s�zProcessUtilsTests.test_outputcsF��ddg�}t��jd|g�}��|t�}�fdd�}|�|�|S)z�
        The L{Deferred} returned by L{getProcessOutput} is fired with an
        L{IOError} L{Failure} if the child process writes to stderr.
        rz!sys.stderr.write("hello world\n")rcs��|jtj�S�N)�
assertFailureZprocessEndedrZProcessDone)�err�rrr�cbFailedMsz?ProcessUtilsTests.test_outputWithErrorIgnored.<locals>.cbFailed)rr	rrr&�IOErrorr )rrr#r)rr(r�test_outputWithErrorIgnored@s�
z-ProcessUtilsTests.test_outputWithErrorIgnoredcCs8|�dddddg�}tj|jd|gdd�}|�|jd	�S)
z�
        If a C{True} value is supplied for the C{errortoo} parameter to
        L{getProcessOutput}, the returned L{Deferred} fires with the child's
        stderr output as well as its stdout output.
        rzsys.stdout.write("foo")�sys.stdout.flush()zsys.stderr.write("foo")�sys.stderr.flush()rT)Zerrortoosfoofoorr"rrr�test_outputWithErrorCollectedSs�	z/ProcessUtilsTests.test_outputWithErrorCollectedcCs,|�dg�}t�|jd|g�}|�|jd�S)z|
        The L{Deferred} returned by L{getProcessValue} is fired with the exit
        status of the child process.
        zraise SystemExit(1)r�)rr	�getProcessValuerr r!r"rrr�
test_valuefszProcessUtilsTests.test_valuecsF��ddddddddd	d
g
�}�fdd�}t��jd
|g�}|�|�S)a
        The L{Deferred} returned by L{getProcessOutputAndValue} fires with a
        three-tuple, the elements of which give the data written to the child's
        stdout, the data written to the child's stderr, and the exit status of
        the child.
        rz!if hasattr(sys.stdout, 'buffer'):z    # Python 3z.    sys.stdout.buffer.write(b'hello world!\n')z0    sys.stderr.buffer.write(b'goodbye world!\n')zelse:z    # Python 2z'    sys.stdout.write(b'hello world!\n')z)    sys.stderr.write(b'goodbye world!\n')zsys.exit(1)csJ|\}}}��|d�tr(��|d�n��|dtj���|d�dS)Ns
hello world!
sgoodbye world!
sgoodbye world!r/)r!rrr)Zout_err_code�outr'�coder(rr�gotOutputAndValue�s
�z@ProcessUtilsTests.test_outputAndValue.<locals>.gotOutputAndValuer)rr	�getProcessOutputAndValuerr �rrr4r#rr(r�test_outputAndValueqs�
	z%ProcessUtilsTests.test_outputAndValuecsJ��ddddddg�}�fdd�}t��jd	|g�}��|t�}|�|�S)
z�
        If the child process exits because of a signal, the L{Deferred}
        returned by L{getProcessOutputAndValue} fires a L{Failure} of a tuple
        containing the child's stdout, stderr, and the signal which caused
        it to exit.
        zimport sys, os, signalz"sys.stdout.write('stdout bytes\n')z"sys.stderr.write('stderr bytes\n')r,r-z$os.kill(os.getpid(), signal.SIGKILL)cs4|\}}}��|d���|d���|tj�dS)Ns
stdout bytes
s
stderr bytes
)r!�signal�SIGKILL)Zout_err_sigr2r'Zsigr(rrr4�s
z>ProcessUtilsTests.test_outputSignal.<locals>.gotOutputAndValuer)rr	r5rr&�tupler r6rr(r�test_outputSignal�s
�z#ProcessUtilsTests.test_outputSignalz"Windows doesn't have real signals.cCsVtj�|���}t�|�|�ddg�}||jd|g|d�}|�||�t	�
���|S)Nzimport os, sys�sys.stdout.write(os.getcwd())r)r)rrrr�makedirsrrr �encode�sys�getfilesystemencoding�rZutilFunc�check�dirrr#rrr�	_pathTest�s
�zProcessUtilsTests._pathTestcCs|�tj|j�S)z
        L{getProcessOutput} runs the given command with the working directory
        given by the C{path} parameter.
        )rDr	rr!r(rrr�test_getProcessOutputPath�sz+ProcessUtilsTests.test_getProcessOutputPathcs�fdd�}��tj|�S)z~
        L{getProcessValue} runs the given command with the working directory
        given by the C{path} parameter.
        cs��|d�dS�Nr�r!��resultZignoredr(rrrB�sz9ProcessUtilsTests.test_getProcessValuePath.<locals>.check)rDr	r0�rrBrr(r�test_getProcessValuePath�sz*ProcessUtilsTests.test_getProcessValuePathcs�fdd�}��tj|�S)z�
        L{getProcessOutputAndValue} runs the given command with the working
        directory given by the C{path} parameter.
        cs&|\}}}��||���|d�dSrFrG�Zout_err_statusrCr2r'Zstatusr(rrrB�s
zBProcessUtilsTests.test_getProcessOutputAndValuePath.<locals>.check)rDr	r5rJrr(r�!test_getProcessOutputAndValuePath�sz3ProcessUtilsTests.test_getProcessOutputAndValuePathc	Cs�tj�|���}t�|�|�dd|fdg�}|�tjt���t�|�|�tj	|t
�t�
d�j��t�	|d�||j
dd|g�}|�||�t����|S)Nzimport os, sys, statzos.chmod(%r, stat.S_IXUSR)r<�.rz-Sr)rrrrr=rZ
addCleanup�chdir�getcwd�chmod�stat�S_IMODE�st_moderr r>r?r@rArrr�_defaultPathTest�s$
�	
�z"ProcessUtilsTests._defaultPathTestcCs|�tj|j�S)a
        If no value is supplied for the C{path} parameter, L{getProcessOutput}
        runs the given command in the same working directory as the parent
        process and succeeds even if the current working directory is not
        accessible.
        )rUr	rr!r(rrr� test_getProcessOutputDefaultPath�sz2ProcessUtilsTests.test_getProcessOutputDefaultPathcs�fdd�}��tj|�S)a
        If no value is supplied for the C{path} parameter, L{getProcessValue}
        runs the given command in the same working directory as the parent
        process and succeeds even if the current working directory is not
        accessible.
        cs��|d�dSrFrGrHr(rrrBsz@ProcessUtilsTests.test_getProcessValueDefaultPath.<locals>.check)rUr	r0rJrr(r�test_getProcessValueDefaultPathsz1ProcessUtilsTests.test_getProcessValueDefaultPathcs�fdd�}��tj|�S)a	
        If no value is supplied for the C{path} parameter,
        L{getProcessOutputAndValue} runs the given command in the same working
        directory as the parent process and succeeds even if the current
        working directory is not accessible.
        cs&|\}}}��||���|d�dSrFrGrLr(rrrBs
zIProcessUtilsTests.test_getProcessOutputAndValueDefaultPath.<locals>.check)rUr	r5rJrr(r�(test_getProcessOutputAndValueDefaultPaths
�z:ProcessUtilsTests.test_getProcessOutputAndValueDefaultPath)�__name__�
__module__�__qualname__�__doc__r
ZIReactorProcessr�skip�output�valuer?�
executablerrr$r+r.r1r7r;rZ	isWindowsrDrErKrMrUrVrWrXrrrrr
s.!
 
r
c@seZdZdZdd�ZdS)�SuppressWarningsTestsz.
    Tests for L{utils.suppressWarnings}.
    cs�g��fdd�}|�td|�dd�}t�|dtdd�f�}|d	�|�t��d
�|d�|�t��d
�|d�|�t��d�d
S)zs
        L{utils.suppressWarnings} decorates a function so that the given
        warnings are suppressed.
        cs��||f�dSr%)�append)r�a�kw�rIrr�showwarning*sz@SuppressWarningsTests.test_suppressWarnings.<locals>.showwarningrfcSst�|�dSr%)�warnings�warn)�msgrrr�f.sz6SuppressWarningsTests.test_suppressWarnings.<locals>.f)�ignorezThis is message��messagezSanity check messager/zUnignored message�N)Zpatchrgr	ZsuppressWarnings�dictr!�len)rrfrj�grrer�test_suppressWarnings$sz+SuppressWarningsTests.test_suppressWarningsN)rYrZr[r\rrrrrrra srac@s*eZdZdZeej�Zdd�Zdd�ZdS)�DeferredSuppressedWarningsTestsz`
    Tests for L{utils.runWithWarningsSuppressed}, the version that supports
    Deferreds.
    cshdifdifg}t��|�|�fdd��t�d���d�t�d�|�dgdd	�|��D��d
S)z�
        If the function called by L{utils.runWithWarningsSuppressed} returns a
        C{Deferred}, the warning filters aren't removed until the Deferred
        fires.
        �rkz.*foo.*�rkz.*bar.*cs�Sr%rrrerr�<lambda>U�zGDeferredSuppressedWarningsTests.test_deferredCallback.<locals>.<lambda>�
ignore foo��ignore foo 2cSsg|]}|d�qSrlr��.0�wrrr�
<listcomp>ZszIDeferredSuppressedWarningsTests.test_deferredCallback.<locals>.<listcomp>N)r�runWithWarningsSuppressedrgrh�callbackr!�
flushWarnings)r�filtersrrer�test_deferredCallbackLs�


�z5DeferredSuppressedWarningsTests.test_deferredCallbackcsxdifdifg}t��|�|�fdd��}t�d���t��|�dd��t�d�|�dgdd	�|��D��d
S)z�
        If the function called by L{utils.runWithWarningsSuppressed} returns a
        C{Deferred}, the warning filters aren't removed until the Deferred
        fires with an errback.
        rtrucs�Sr%rrrerrrvfrwzFDeferredSuppressedWarningsTests.test_deferredErrback.<locals>.<lambda>rxcSs
|�t�Sr%)Ztrap�ZeroDivisionError)rjrrrrvirwrzcSsg|]}|d�qSrlrr{rrrr~lszHDeferredSuppressedWarningsTests.test_deferredErrback.<locals>.<listcomp>N)	rrrgrhZerrbackr�Z
addErrbackr!r�)rr�r#rrer�test_deferredErrback]s�

�z4DeferredSuppressedWarningsTests.test_deferredErrbackN)	rYrZr[r\�staticmethodr	rr�r�rrrrrsCs
rs)r\Z
__future__rrrgrrRr?r8Ztwisted.python.compatrZtwisted.python.runtimerZ
twisted.trialrZtwisted.internetrrr	r
Ztwisted.internet.deferrZtwisted.python.test.test_utilrZTestCaser
ZSynchronousTestCaserarsrrrr�<module>s(#