HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //lib/python3/dist-packages/twisted/trial/__pycache__/_asynctest.cpython-38.pyc
U

��W[*9�@s�dZddlmZmZddlZddlZddlmZddlm	Z	m
Z
ddlmZddl
mZmZddlmZmZmZgZeej�Gd	d
�d
e��ZdS)zP
Things likely to be used by writers of unit tests.

Maintainer: Jonathan Lange
�)�division�absolute_importN)�implementer)�defer�utils)�failure)�itrial�util)�FailTest�SkipTest�SynchronousTestCasecs�eZdZdZd3�fdd�	Zdd�ZeZdd�Zd	d
�Zdd�Z	d
d�Z
dd�Zdd�Zdd�Z
dd�Zdd�Zdd�Zdd�Zdd�Zdd �Zd!d"�Zd#d$�Zd%d&�Zd'd(�Zd)d*�Z�fd+d,�Zd-d.�Zd/d0�Zefd1d2�Z�ZS)4�TestCasea=
    A unit test. The atom of the unit testing universe.

    This class extends L{SynchronousTestCase} which extends C{unittest.TestCase}
    from the standard library. The main feature is the ability to return
    C{Deferred}s from tests and fixture methods and to have the suite wait for
    those C{Deferred}s to fire.  Also provides new assertions such as
    L{assertFailure}.

    @ivar timeout: A real number of seconds. If set, the test will
    raise an error if it takes longer than C{timeout} seconds.
    If not set, util.DEFAULT_TIMEOUT_DURATION is used.
    �runTestcstt|��|�dS)a�
        Construct an asynchronous test case for C{methodName}.

        @param methodName: The name of a method on C{self}. This method should
        be a unit test. That is, it should be a short method that calls some of
        the assert* methods. If C{methodName} is unspecified,
        L{SynchronousTestCase.runTest} will be used as the test method. This is
        mostly useful for testing Trial.
        N)�superr
�__init__)�self�
methodName��	__class__��:/usr/lib/python3/dist-packages/twisted/trial/_asynctest.pyr.s
zTestCase.__init__cs&�fdd�}��fdd�}|�||�S)z�
        Fail if C{deferred} does not errback with one of C{expectedFailures}.
        Returns the original Deferred with callbacks added. You will need
        to return this Deferred from your test case.
        cs��d|f��dS)Nz&did not catch an error, instead got %r)�failureException)�ignore�rrr�_cbAs�z#TestCase.assertFailure.<locals>._cbcs.|j��r|jSd�t|�f}��|��dS)Nz
Expected: %r
Got:
%s)�check�value�strr)r�output��expectedFailuresrrr�_ebEs

�z#TestCase.assertFailure.<locals>._eb)�addCallbacks)rZdeferredr rr!rrr�
assertFailure;szTestCase.assertFailurecs�ddlm����������fdd�}t�|tjtd��}t���}t	�
|�rhtd|f�}t�
|�St�tj���|�}���||��|��fdd��|S)	Nr��reactorcs�t�d���f�}t�|�}z|�|�WnZtjk
r����d�_���}|dk	rv|�	|�rv��
�||�n���|�YnXdS)Nz %r (%s) still running at %s secsT)r�TimeoutErrorr�FailureZerrbackZAlreadyCalledError�crash�	_timedOut�getTodo�expected�addExpectedFailure�addError)�d�e�f�todo)rr%�resultr�timeoutrr�	onTimeoutSs�
z TestCase._run.<locals>.onTimeout��categoryz7%r is a generator function and therefore will never runcs���r���p|S�N)ZactiveZcancel)�x)�callrr�<lambda>r�zTestCase._run.<locals>.<lambda>)�twisted.internetr%�
getTimeoutr�suppressWarningsr	�suppress�DeprecationWarning�getattr�inspectZisgeneratorfunction�	TypeErrorrZfailZ
maybeDeferredZrunWithWarningsSuppressed�_getSuppressZ	callLater�addBoth)rrr2r4�method�excr.r)r9rr%r2rr3r�_runPs.
�

��
�z
TestCase._runcOs|j||�Sr7)�run)r�args�kwargsrrr�__call__vszTestCase.__call__cCs*|�d|�}|j|j|j|f|fd�|S)N�setUp�ZcallbackArgsZerrbackArgs)rHr"�deferTestMethod�
_ebDeferSetUp�r�ignoredr2r.rrr�
deferSetUpzs�zTestCase.deferSetUpcCsN|�t�r$|�||�|j|j��n|�||�|�t�rB|��|�	d|�Sr7)
rr�addSkip�_getSkipReasonrMrr-�KeyboardInterrupt�stop�deferRunCleanups�rrr2rrrrP�s

zTestCase._ebDeferSetUpcCsH|�|j|�}|j|j|j|f|fd�|�|j|�|�|j|�|S)NrN)rH�_testMethodNamer"�_cbDeferTestMethod�_ebDeferTestMethodrErX�
deferTearDownrQrrrrO�s�zTestCase.deferTestMethodcCs(|��dk	r|�||���nd|_|S)NT)r*ZaddUnexpectedSuccess�_passed)rrRr2rrrr[�szTestCase._cbDeferTestMethodcCs�|��}|dk	r*|�|�r*|�|||�nr|�|jt�rF|�||�nV|�t�rf|�||�|�	�n6|�t
�r�|�||�t
||j�|j��n|�||�dSr7)r*r+r,rrr
Z
addFailurerVr-rWrrTrUrArZr)rr0r2r1rrrr\�s


�zTestCase._ebDeferTestMethodcCs|�d|�}|�|j|�|S)NZtearDown)rHZ
addErrback�_ebDeferTearDownrQrrrr]�szTestCase.deferTearDowncCs(|�||�|�t�r|��d|_dS�NF)r-rrVrWr^rYrrrr_�s
zTestCase._ebDeferTearDowncCs|��}|�|j|�|S)zd
        Run any scheduled cleanups and report errors (if any to the result
        object.
        )�_runCleanupsZaddCallback�_cbDeferRunCleanupsrQrrrrX�szTestCase.deferRunCleanupscCs@|D]6\}}|tjkr|�||�|�t�r4|��d|_qdSr`)r�FAILUREr-rrVrWr^)rZcleanupResultsr2�flagZtestFailurerrrrb�s

zTestCase._cbDeferRunCleanupscCs�zt�||���}|sd|_Wn"|�|t���d|_YnX|j��D]}|�||�d|_qL|�	�|�
�|jr�|�|�dSr`)r	�_JanitorZpostCaseCleanupr^r-rr'Z	_observerZ	getErrorsZflushLoggedErrorsZ_removeObserverZ
addSuccess)rr2Zclean�errorrrr�_cleanUp�s
zTestCase._cleanUpcCs6zt�||���Wn|�|t���YnXdSr7)r	reZpostClassCleanupr-rr')rr2rrr�
_classCleanUp�szTestCase._classCleanUpcs��fdd�}|S)z�
        Create a method which wraps the reactor method C{name}. The new
        method issues a deprecation warning and calls the original.
        cs(tjd��fdtd��j�||�S)Nz{reactor.%s cannot be used inside unit tests. In the future, using %s will fail the test and may crash or hang the test run.�)�
stacklevelr6)�warnings�warnr@�_reactorMethods)�a�kw��namerrr�_�s��z&TestCase._makeReactorMethod.<locals>._r)rrqrrrrpr�_makeReactorMethod�szTestCase._makeReactorMethodcCs6i|_dD]&}t||�|j|<t|||�|��q
dS)z�
        Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is,
        each method is wrapped in a function that issues a deprecation
        warning, then calls the original.

        @param reactor: The Twisted reactor.
        )r(ZiteraterWN)rmrA�setattrrs)rr%rqrrr�_deprecateReactor�szTestCase._deprecateReactorcCs*|j��D]\}}t|||�q
i|_dS)z�
        Restore the deprecated reactor methods. Undoes what
        L{_deprecateReactor} did.

        @param reactor: The Twisted reactor.
        N)rm�itemsrt)rr%rqrFrrr�_undeprecateReactorszTestCase._undeprecateReactorcCsHdd�}g}t|j�dkr>|j��\}}}|�||||��qt�|�S)z�
        Run the cleanups added with L{addCleanup} in order.

        @return: A C{Deferred} that fires when all cleanups are run.
        cs���fdd�S)Ncs
����Sr7rr�rJr0rKrrr:r;z>TestCase._runCleanups.<locals>._makeFunction.<locals>.<lambda>r)r0rJrKrrxr�
_makeFunctionsz,TestCase._runCleanups.<locals>._makeFunctionr)�lenZ	_cleanups�pop�appendr	Z_runSequentially)rryZ	callablesr0rJrKrrrraszTestCase._runCleanupsc	Csdddlm}|�|�d|_z6|�d|�}z|�|�W5|�|�|�|�XW5|�|�XdS)z�
        Really run C{setUp}, the test method, and C{tearDown}.  Any of these may
        return L{defer.Deferred}s. After they complete, do some reactor cleanup.

        @param result: A L{TestResult} object.
        rr$FN)	r<r%rur)rwrSrgrh�_wait)rr2r%r.rrr�_runFixturesAndTests

zTestCase._runFixturesAndTestcstt|�j|f|�|�S)a	
        Extend the base cleanup feature with support for cleanup functions which
        return Deferreds.

        If the function C{f} returns a Deferred, C{TestCase} will wait until the
        Deferred has fired before proceeding to the next function.
        )rr
�
addCleanup)rr0rJrKrrrr4szTestCase.addCleanupcCs|��Sr7)rDrrrr�getSuppress?szTestCase.getSuppressc	CsNt�|jdtj�}z
t|�WSttfk
rHtjdt	d�tjYSXdS)ae
        Returns the timeout value set on this test. Checks on the instance
        first, then the class, then the module, then packages. As soon as it
        finds something with a C{timeout} attribute, returns that. Returns
        L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See
        L{TestCase} docstring for more details.
        r3z)'timeout' attribute needs to be a number.r5N)
r	ZacquireAttributeZ_parentsZDEFAULT_TIMEOUT_DURATION�float�
ValueErrorrCrkrlr@)rr3rrrr=Cs
�
�zTestCase.getTimeoutcs�|rtd��ddlm�g��fdd�}��fdd�}t�|tjdtd	��}�fd
d�}t�|tjdtd	��}|�d�zV|�
|��r�W�DdS|�
|�|�_z���W5�`X�s�|j
r�W�dSt��W5d�|�	�XdS)
zJTake a Deferred that only ever callbacks. Block until it happens.
        z_wait is not reentrantrr$cs�dk	r��|�dSr7)r|)�any)�resultsrrr|aszTestCase._wait.<locals>.appendcs�dk	r���dSr7�r()Zign�r%r�rrr(dszTestCase._wait.<locals>.crashzreactor\.crash cannot be used.*)�messager6cs���dSr7r�rr$rrrWjszTestCase._wait.<locals>.stopN)�RuntimeErrorr<r%rr>r	r?r@r|r{rErWrIr)rV)rr.�runningr|r(rWrr�rr}YsD����




zTestCase._wait)r)�__name__�
__module__�__qualname__�__doc__rr#ZfailUnlessFailurerHrLrSrPrOr[r\r]r_rXrbrgrhrsrurwrar~rr�r=�_wait_is_runningr}�
__classcell__rrrrr
s4
&


	r
)r�Z
__future__rrrBrkZzope.interfacerr<rrZtwisted.pythonrZ
twisted.trialrr	Ztwisted.trial._synctestr
rrr�Z	ITestCaser
rrrr�<module>s