HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/lib/python3/dist-packages/twisted/test/__pycache__/test_threadpool.cpython-38.pyc
U


W[�V�@s�dZddlmZmZddlZddlZddlZddlZddlZddl	m
Z
ddlmZddl
mZmZmZmZddlmZmZGdd	�d	e�Ze�e�Gd
d�dej�ZGdd
�d
ej�ZGdd�dej�ZGdd�de�ZGdd�dej�ZdS)z(
Tests for L{twisted.python.threadpool}
�)�division�absolute_importN)�range)�unittest)�
threadpool�
threadable�failure�context)�Team�createMemoryWorkerc@s&eZdZdZdd�Zdd�ZdgZdS)�SynchronizationrcCs ||_||_t��|_g|_dS�N)�N�waiting�	threading�Lock�lock�runs)�selfrr�r�>/usr/lib/python3/dist-packages/twisted/test/test_threadpool.py�__init__s
zSynchronization.__init__cCs||j�d�r0t|j�ds$t�d�|j��n|jd7_|j��|j�d�t|j�|j	krn|j
��|j��dS)NF�g-C��6*?�)r�acquire�lenr�time�sleep�release�failures�appendrr�rrrr�run!s


zSynchronization.runr"N)�__name__�
__module__�__qualname__rrr"Zsynchronizedrrrrrsrc@s�eZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dd�Z
dd�Zdd�Zdd�Z
dd�Zdd�Zdd�Zdd�Zdd�Zd d!�Zd"d#�Zd$d%�Zd&S)'�ThreadPoolTestsz
    Test threadpools.
    cCsdS)zD
        Return number of seconds to wait before giving up.
        rrr!rrr�
getTimeoutFszThreadPoolTests.getTimeoutcCs8td�}|D]}|�d�rq4t�d�q|�d�dS)Ni@BFg�h㈵��>z%A long time passed without succeeding)rrrr�fail)rr�items�irrr�_waitForLockMs
zThreadPoolTests._waitForLockcCs,t�dd�}|�|jd�|�|jd�dS)zy
        L{ThreadPool.min} and L{ThreadPool.max} are set to the values passed to
        L{ThreadPool.__init__}.
        ��N)r�
ThreadPool�assertEqual�min�max�r�poolrrr�test_attributesWszThreadPoolTests.test_attributescCszt�dd�}|��|�|j�|�t|j�d�t�dd�}|�t|j�d�|��|�|j�|�t|j�d�dS)zV
        L{ThreadPool.start} creates the minimum number of threads specified.
        rr��
N)rr.�start�
addCleanup�stopr/r�threadsr2rrr�
test_startaszThreadPoolTests.test_startcCs<t�dd�}|��|��|�d�|�t|j�d�dS)z�
        L{ThreadPool.adjustPoolsize} only modifies the pool size and does not
        start new workers while the pool is not running.
        rr�N)rr.r7r9ZadjustPoolsizer/rr:r2rrr�test_adjustingWhenPoolStoppedqs

z-ThreadPoolTests.test_adjustingWhenPoolStoppedcCs�t�dd�}|��|�|j�|�|jg�dd�}Gdd�dt�}|�}t�	|�}t�	|�}|�
||�t��}|�
|j
�|�|���~~t��|�|��|�|��dS)z�
        Test that creating threads in the threadpool with application-level
        objects as arguments doesn't results in those objects never being
        freed, with the thread maintaining a reference to them as long as it
        exists.
        rrcSsdSr
r)�argrrr�worker�sz<ThreadPoolTests.test_threadCreationArguments.<locals>.workerc@seZdZdS)z:ThreadPoolTests.test_threadCreationArguments.<locals>.DumbN�r#r$r%rrrr�Dumb�srAN)rr.r7r8r9r/r:�object�weakref�ref�callInThreadr�Event�set�waitr'�gc�collect�assertIsNone)r�tpr?rA�unique�	workerRef�	uniqueRef�eventrrr�test_threadCreationArguments}s$

z,ThreadPoolTests.test_threadCreationArgumentscs,t�dd�}|����|j���|jg�i�t���t���g��������fdd�}�fdd�}Gdd�dt	����}t
�|�}t
�|��t
�|��|j||||d	�~~��
�������t������������~t����|�����d����t����d
d
g�d
S)ze
        As C{test_threadCreationArguments} above, but for
        callInThreadWithCallback.
        rrcsFt�����������d<���d<�����t�|��dS)NrNrO)rIrJrHr'rGr rCrD��success�result)�onResultDone�onResultWait�refdict�	resultRefrrOrNrr�onResult�s

zVThreadPoolTests.test_threadCreationArgumentsCallInThreadWithCallback.<locals>.onResultcs��Sr
r)r>�test)rArrr?�szTThreadPoolTests.test_threadCreationArgumentsCallInThreadWithCallback.<locals>.workerc@seZdZdS)zRThreadPoolTests.test_threadCreationArgumentsCallInThreadWithCallback.<locals>.DumbNr@rrrrrA�srA)rZN)rr.r7r8r9r/r:rrFrBrCrD�callInThreadWithCallbackrGrHr'rIrJrK�list�values)rrLrYr?rMZonResultRefr)rArUrVrWrXrrOrNr�4test_threadCreationArgumentsCallInThreadWithCallback�s8


zDThreadPoolTests.test_threadCreationArgumentsCallInThreadWithCallbackcCsXt�dd�}|�|jd�|�|jd�t�t�|��}|�|jd�|�|jd�dS)z�
        Threadpools can be pickled and unpickled, which should preserve the
        number of threads and other parameters.
        ��N)rr.r/r0r1�pickle�loads�dumps)rr3�copyrrr�test_persistence�sz ThreadPoolTests.test_persistencecCsvd}t��}|��|�|j�t��}|��t||�}t	|�D]}|||�qB|�
|�|�|jd�
|j��dS)z�
        Test synchronization of calls made with C{method}, which should be
        one of the mechanisms of the threadpool to execute work in threads.
        r6zrun() re-entered {} timesN)rr.r7r8r9rrrrrr+�assertFalser�format)r�methodrrLr�actorr*rrr�_threadpoolTests	


�zThreadPoolTests._threadpoolTestcCs|�dd��S)z?
        Call C{_threadpoolTest} with C{callInThread}.
        cSs|�|j�Sr
)rEr")rLrirrr�<lambda>&�z3ThreadPoolTests.test_callInThread.<locals>.<lambda>)rjr!rrr�test_callInThread!s�z!ThreadPoolTests.test_callInThreadcs`Gdd�dt���fdd�}t�dd�}|�|�|��|��|���}|�t|�d�dS)zi
        L{ThreadPool.callInThread} logs exceptions raised by the callable it
        is passed.
        c@seZdZdS)z<ThreadPoolTests.test_callInThreadException.<locals>.NewErrorNr@rrrr�NewError.srncs
���dSr
rr�rnrr�
raiseError1sz>ThreadPoolTests.test_callInThreadException.<locals>.raiseErrorrrN)	�	Exceptionrr.rEr7r9�flushLoggedErrorsr/r)rrprL�errorsrror�test_callInThreadException)s

z*ThreadPoolTests.test_callInThreadExceptioncs�t������g���fdd�}t�dd�}|�|dd��|��z|���W5|��X|�	�d�|�
�dd�dS)	z�
        L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
        two-tuple of C{(True, result)} where C{result} is the value returned
        by the callable supplied.
        cs �����|���|�dSr
�rr rR��results�waiterrrrYHs
z?ThreadPoolTests.test_callInThreadWithCallback.<locals>.onResultrrcSsdS)NrZrrrrrrkNrlz?ThreadPoolTests.test_callInThreadWithCallback.<locals>.<lambda>rZN)rrrrr.r[r7r9r+�
assertTruer/)rrYrLrrvr�test_callInThreadWithCallback=s
z-ThreadPoolTests.test_callInThreadWithCallbackcs�Gdd�dt���fdd�}t������g���fdd�}t�dd�}|�||�|��z|�	��W5|��X|�
�d�|��dtj
�|�t�dj���d	S)
z�
        L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
        two-tuple of C{(False, failure)} where C{failure} represents the
        exception raised by the callable supplied.
        c@seZdZdS)zRThreadPoolTests.test_callInThreadWithCallbackExceptionInCallback.<locals>.NewErrorNr@rrrrrn`srncs
���dSr
rrrorrrpcszTThreadPoolTests.test_callInThreadWithCallbackExceptionInCallback.<locals>.raiseErrorcs �����|���|�dSr
rurRrvrrrYks
zRThreadPoolTests.test_callInThreadWithCallbackExceptionInCallback.<locals>.onResultrrN)rqrrrrr.r[r7r9r+rfZassertIsInstancerZFailurery�
issubclass�type)rrprYrLr)rnrwrxr�0test_callInThreadWithCallbackExceptionInCallbackZs
z@ThreadPoolTests.test_callInThreadWithCallbackExceptionInCallbackcs�Gdd�dt��t��}|��g���fdd�}t�dd�}|�|dd��|�|j�|�	�z|�|�W5|�
�X|���}|�
t|�d�|��d�|��d�d	S)
zj
        L{ThreadPool.callInThreadWithCallback} logs the exception raised by
        C{onResult}.
        c@seZdZdS)zRThreadPoolTests.test_callInThreadWithCallbackExceptionInOnResult.<locals>.NewErrorNr@rrrrrn�srncs��|���|����dSr
)r rR�rnrwrrrY�s

zRThreadPoolTests.test_callInThreadWithCallbackExceptionInOnResult.<locals>.onResultrrcSsdSr
rrrrrrk�rlzRThreadPoolTests.test_callInThreadWithCallbackExceptionInOnResult.<locals>.<lambda>N)rqrrrrr.r[rErr7r9r+rrr/rryrK)rrxrYrLrsrr~r�0test_callInThreadWithCallbackExceptionInOnResult~s 

z@ThreadPoolTests.test_callInThreadWithCallbackExceptionInOnResultcs�g�t�����fdd�}�fdd�}t�dd�}|�||�|��|�|j���|�	��|�
t��d�|�
�d�d�dS)	z�
        L{ThreadPool.callInThreadWithCallback} calls the function it is
        given and the C{onResult} callback in the same thread.
        cs��t��j����dSr
)r r�
currentThread�identrGrR�rP�	threadIdsrrrY�sz5ThreadPoolTests.test_callbackThread.<locals>.onResultcs��t��j�dSr
)r rr�r�r)r�rr�func�sz1ThreadPoolTests.test_callbackThread.<locals>.funcrrr<N)rrFrr.r[r7r8r9rHr'r/r)rrYr�rLrr�r�test_callbackThread�sz#ThreadPoolTests.test_callbackThreadcs�tj��jd}d|d<g�t�����fdd�}�fdd�}t�dd	�}|�||�|�	�|�
|j���|�
��|�t��d
�|�|�d�|�|�d	�dS)z�
        The context L{ThreadPool.callInThreadWithCallback} is invoked in is
        shared by the context the callable and C{onResult} callback are
        invoked in.
        ���zthis must be presentZtestingcs&tj��jd}��|����dS�Nr�)r	�theContextTracker�currentContext�contextsr rG)rSrT�ctx�r�rPrrrY�s
z6ThreadPoolTests.test_callbackContext.<locals>.onResultcstj��jd}��|�dSr�)r	r�r�r�r )r�)r�rrr��sz2ThreadPoolTests.test_callbackContext.<locals>.funcrrr<N)r	r�r�r�rrFrr.r[r7r8r9rHr'r/r)rZmyctxrYr�rLrr�r�test_callbackContext�sz$ThreadPoolTests.test_callbackContextcCsNt��}|��t�dd�}|�|j�|��z|�	|�W5|��XdS)z�
        Work added to the threadpool before its start should be executed once
        the threadpool is started: this is ensured by trying to release a lock
        previously acquired.
        rrN)
rrrrr.rErr7r9r+)rrxrLrrr�test_existingWork�sz!ThreadPoolTests.test_existingWorkcs�t�dd�}|��|�|j�|�|jd�|�t|j�d�|�t|j	�d�t
���t
�����fdd�}|�|���
d�|�|jd�|�t|j�d�|�t|j	�d����t|j�s�t�d�q�|�t|j�d�|�t|j	�d�dS)z{
        As the worker receives and completes work, it transitions between
        the working and waiting states.
        rrcs�����d�dS)Nr6)rGrHr�ZthreadFinishZ
threadWorkingrr�_threadsz;ThreadPoolTests.test_workerStateTransition.<locals>._threadr6g����Mb@?N)rr.r7r8r9r/�workersr�waitersZworkingrrFrErHrGrr)rr3r�rr�r�test_workerStateTransition�s&


z*ThreadPoolTests.test_workerStateTransitionN)r#r$r%�__doc__r'r+r4r;r=rQr^rerjrmrtrzr}rr�r�r�r�rrrrr&As&

+K$#"r&c@s$eZdZdd�Zdd�Zdd�ZdS)�RaceConditionTestscs<t�dd��_t���_�j���fdd�}��|�dS)Nrr6cs�j���`dSr
)rr9rr!rr�dones
z&RaceConditionTests.setUp.<locals>.done)rr.rrFrPr7r8)rr�rr!r�setUps


zRaceConditionTests.setUpcCsdS)z=
        A reasonable number of seconds to time out.
        rrr!rrrr'%szRaceConditionTests.getTimeoutcCs�|��}|j�|jj�|j�|�|j��td�D]}|j�|jj�q6|j�|jj�|j�|�|j��s�|j��|�	d�dS)a�
        If multiple threads are waiting on an event (via blocking on something
        in a callable passed to L{threadpool.ThreadPool.callInThread}), and
        there is spare capacity in the threadpool, sending another callable
        which will cause those to un-block to
        L{threadpool.ThreadPool.callInThread} will reliably run that callable
        and un-block the blocked threads promptly.

        @note: This is not really a unit test, it is a stress-test.  You may
            need to run it with C{trial -u} to fail reliably if there is a
            problem.  It is very hard to regression-test for this particular
            bug - one where the thread pool may consider itself as having
            "enough capacity" when it really needs to spin up a new thread if
            it possibly can - in a deterministic way, since the bug can only be
            provoked by subtle race conditions.
        r5z9'set' did not run in thread; timed out waiting on 'wait'.N)
r'rrErPrGrH�clearrZisSetr()rZtimeoutr*rrr�test_synchronization,s


�z'RaceConditionTests.test_synchronizationN)r#r$r%r�r'r�rrrrr�sr�c@s eZdZdZdd�Zdd�ZdS)�
MemoryPoolz
    A deterministic threadpool that uses in-memory data structures to queue
    work rather than threads to execute work.
    cOs*||_||_||_tjj|f|�|�dS)a�
        Initialize this L{MemoryPool} with a test case.

        @param coordinator: a worker used to coordinate work in the L{Team}
            underlying this threadpool.
        @type coordinator: L{twisted._threads.IExclusiveWorker}

        @param failTest: A 1-argument callable taking an exception and raising
            a test-failure exception.
        @type failTest: 1-argument callable taking (L{Failure}) and raising
            L{unittest.FailTest}.

        @param newWorker: a 0-argument callable that produces a new
            L{twisted._threads.IWorker} provider on each invocation.
        @type newWorker: 0-argument callable returning
            L{twisted._threads.IWorker}.
        N)�_coordinator�	_failTest�
_newWorkerrr.r)r�coordinatorZfailTest�	newWorker�args�kwargsrrrrSszMemoryPool.__init__cs&���fdd�}t�j|�jd���S)a�
        Override testing hook to create a deterministic threadpool.

        @param currentLimit: A 1-argument callable which returns the current
            threadpool size limit.

        @param threadFactory: ignored in this invocation; a 0-argument callable
            that would produce a thread.

        @return: a L{Team} backed by the coordinator and worker passed to
            L{MemoryPool.__init__}.
        cs&���}|j|j��krdS���Sr
)Z
statisticsZbusyWorkerCountZidleWorkerCountr�)Zstats��currentLimitrZteamrr�respectLimitxs��z&MemoryPool._pool.<locals>.respectLimit)r�ZcreateWorkerZlogException)r
r�r�)rr�Z
threadFactoryr�rr�r�_poolks
	�zMemoryPool._poolN)r#r$r%r�rr�rrrrr�Msr�c@s eZdZdZdd�Zdd�ZdS)�
PoolHelpera
    A L{PoolHelper} constructs a L{threadpool.ThreadPool} that doesn't actually
    use threads, by using the internal interfaces in L{twisted._threads}.

    @ivar performCoordination: a 0-argument callable that will perform one unit
        of "coordination" - work involved in delegating work to other threads -
        and return L{True} if it did any work, L{False} otherwise.

    @ivar workers: the workers which represent the threads within the pool -
        the workers other than the coordinator.
    @type workers: L{list} of 2-tuple of (L{IWorker}, C{workPerformer}) where
        C{workPerformer} is a 0-argument callable like C{performCoordination}.

    @ivar threadpool: a modified L{threadpool.ThreadPool} to test.
    @type threadpool: L{MemoryPool}
    cs:t�\}�_g�_�fdd�}t||j|f|�|��_dS)z�
        Create a L{PoolHelper}.

        @param testCase: a test case attached to this helper.

        @type args: The arguments passed to a L{threadpool.ThreadPool}.

        @type kwargs: The arguments passed to a L{threadpool.ThreadPool}
        cs�j�t���jddS)Nr�r)r�r rrr!rrr��sz&PoolHelper.__init__.<locals>.newWorkerN)r�performCoordinationr�r�r(r)rZtestCaser�r�r�r�rr!rr�s
��zPoolHelper.__init__cCs|��r
qdS)z�
        Perform all currently scheduled "coordination", which is the work
        involved in delegating work to other threads.
        N)r�r!rrr�performAllCoordination�sz!PoolHelper.performAllCoordinationN)r#r$r%r�rr�rrrrr��sr�c@s eZdZdZdd�Zdd�ZdS)�MemoryBackedTestszn
    Tests using L{PoolHelper} to deterministically test properties of the
    threadpool implementation.
    cCslt|dd�}d}t|�D]}|j�dd��q|��|�|jg�|j��|��|�t|j�|�dS)z�
        If a threadpool is told to do work before starting, then upon starting
        up, it will start enough workers to handle all of the enqueued work
        that it's been given.
        rr6rcSsdSr
rrrrrrk�rlz;MemoryBackedTests.test_workBeforeStarting.<locals>.<lambda>N)	r�rrrEr�r/r�r7r�r�helper�n�xrrr�test_workBeforeStarting�s
z)MemoryBackedTests.test_workBeforeStartingcCspt|dd�}d}t|�D]}|j�dd��q|��|�|jg�|j��|��|�t|j�|jj	�dS)z�
        If the amount of work before starting exceeds the maximum number of
        threads allowed to the threadpool, only the maximum count will be
        started.
        rr6�2cSsdSr
rrrrrrk�rlzBMemoryBackedTests.test_tooMuchWorkBeforeStarting.<locals>.<lambda>N)
r�rrrEr�r/r�r7rr1r�rrr�test_tooMuchWorkBeforeStarting�s
z0MemoryBackedTests.test_tooMuchWorkBeforeStartingN)r#r$r%r�r�r�rrrrr��sr�)r�Z
__future__rrrarrCrIrZtwisted.python.compatrZ
twisted.trialrZtwisted.pythonrrrr	Ztwisted._threadsr
rrBrZsynchronizeZSynchronousTestCaser&r�r.r�r�r�rrrr�<module>s(%
Y6;1