HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //proc/self/root/lib/python3/dist-packages/twisted/test/__pycache__/test_threadable.cpython-38.pyc
U


W[��@s�dZddlmZmZddlZddlZzddlZWnek
rHdZYnXdZddl	m
Z
ddlmZddl
mZGdd	�d	�Ze�e�Gd
d�dej�ZGdd
�d
ej�ZdS)z)
Tests for L{twisted.python.threadable}.
�)�division�absolute_importNzPlatform lacks thread support)�_PY3)�unittest)�
threadablec@s"eZdZdgZdZdZdd�ZdS)�
TestObject�aMethod����cCsLtd�D]>}|j|j|_|_|j|j|_|jdkstd|jf��qdS)N�
rzz == %d, not 0 as expected)�range�y�x�z�AssertionError)�self�i�r�>/usr/lib/python3/dist-packages/twisted/test/test_threadable.pyrszTestObject.aMethodN)�__name__�
__module__�__qualname__Zsynchronizedrr
rrrrrrsrc@sHeZdZdd�Zdd�Zdd�Zdd�Zed	k	r<ee_ee_d
d�Z	d	S)�SynchronizationTestscCsbtr2ttdd�dk	r^|�tjt���t�d�n,ttdd�dk	r^|�tjt���t�d�dS)z�
        Reduce the CPython check interval so that thread switches happen much
        more often, hopefully exercising more possible race conditions.  Also,
        delay actual test startup until the reactor has been started.
        �getswitchintervalNgH�����z>�getcheckinterval�)r�getattr�sysZ
addCleanup�setswitchintervalr�setcheckintervalr�rrrr�setUp'szSynchronizationTests.setUpcCs|�dtjj�dS)zk
        The name of a synchronized method is inaffected by the synchronization
        decorator.
        rN)ZassertEqualrrrr rrr�test_synchronizedName7sz*SynchronizationTests.test_synchronizedNamecsTt��g�tj�fdd�d�}|��|��|��dd�|�t��d�dS)z�
        L{threadable.isInIOThread} returns C{True} if and only if it is called
        in the same thread as L{threadable.registerAsIOThread}.
        cs��t���S�N)�appendr�isInIOThreadr�Z
foreignResultrr�<lambda>G�z8SynchronizationTests.test_isInIOThread.<locals>.<lambda>��targetrz#Non-IO thread reported as IO threadz#IO thread reported as not IO threadN)	rZregisterAsIOThread�	threading�Thread�start�joinZassertFalseZ
assertTruer%)r�trr&r�test_isInIOThread?s
���z&SynchronizationTests.test_isInIOThreadcslt��g���fdd�}g}td�D]"}tj|d�}|�|�|��q$|D]}|��qL�rht����dS)Nc
sPztd�D]}���q
Wn0tk
rJ}z��t|��W5d}~XYnXdS�Ni�)rrrr$�str)r�e��errors�orr�callMethodLotsUs
zHSynchronizationTests.testThreadedSynchronization.<locals>.callMethodLots�r))	rrr+r,r$r-r.rZFailTest)rr7Zthreadsrr/rr4r�testThreadedSynchronizationPs


z0SynchronizationTests.testThreadedSynchronizationNcCs t�}td�D]}|��qdSr1)rrr)rr6rrrr�testUnthreadedSynchronizationmsz2SynchronizationTests.testUnthreadedSynchronization)
rrrr!r"r0r9�
threadingSkip�skipr:rrrrr&src@s*eZdZdd�Zedk	ree_dd�ZdS)�SerializationTestscCs4t��}t|�}t�|�}t�|�}|�||�dSr#)rZXLock�type�pickle�dumps�loadsZassertIsInstance)r�lockZlockType�
lockPickleZnewLockrrr�testPicklingus


zSerializationTests.testPicklingNcCs(d}t�|�}t�|d�}t�|�dS)Ns6ctwisted.python.threadable
unpickle_lock
p0
(tp1
Rp2
.�)r?rAr@)rrCrBZ	newPicklerrr�testUnpickling�s
z!SerializationTests.testUnpickling)rrrrDr;r<rFrrrrr=tsr=)�__doc__Z
__future__rrrr?r+�ImportErrorr;Ztwisted.python.compatrZ
twisted.trialrZtwisted.pythonrrZsynchronizeZSynchronousTestCaserr=rrrr�<module>s

N