HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/lib/python3/dist-packages/twisted/conch/test/__pycache__/test_recvline.cpython-38.pyc
U


W[>c�@s�dZddlZddlZddlmZddlmZddlmZm	Z	m
Z
ddlmZm
Z
ddlmZddlmZmZdd	lmZdd
lmZddlmZe��r�eej�Zej�ej�ed<n"e
�Zej�ej�� e�!��ed
<Gdd�dej"�Z#ddlm$Z$ddlm%Z%ddl&m'Z'Gdd�dej(�Z)dZ*dZ+dZ,dZ-dZ.dZ/dZ0dZ1dZ2ddlm3Z3z@ddl4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:dd l;m<Z<m=Z=m>Z>m?Z?m@Z@WneAk
�r�d!ZBYn�Xd"ZBGd#d$�d$e7jC�ZDGd%d&�d&e8jE�ZFGd'd(�d(e5jG�ZHGd)d*�d*e6jI�ZJGd+d,�d,e?�ZKGd-d.�d.e=�ZLGd/d0�d0e<�ZMe	�NeLeMe9jO�Gd1d2�d2e%jP�ZQGd3d4�d4�ZRGd5d6�d6eR�ZSdd7lTmUZUGd8d9�d9ejVeUjW�ZXGd:d;�d;ejYeUjW�ZZGd<d=�d=eR�Z[zdd>lm\Z\WneAk
�r�dZ\YnXGd?d@�d@eR�Z]GdAdB�dB�Z^GdCdD�dDe[ej"e^�Z_GdEdF�dFeSej"e^�Z`GdGdH�dHe]ej"e^�ZaGdIdJ�dJ�ZbGdKdL�dLe[ej"eb�ZcGdMdN�dNeSej"eb�ZdGdOdP�dPe]ej"eb�ZeGdQdR�dRej"�ZfdS)SzU
Tests for L{twisted.conch.recvline} and fixtures for testing related
functionality.
�N)�insults)�recvline)�reflect�
components�filepath)�	iterbytes�bytesEnviron)�platform)�defer�error)�unittest)�portal)�StringTransport�
PYTHONPATHs
PYTHONPATHc@sleZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	dd�Z
dd�Zdd�Zdd�Z
dd�ZdS)�ArrowsTestscsFt��_t���_t���_�fdd��j_��j_	�j�
�j�dS)Ncs�jS�N)�p���selfr�B/usr/lib/python3/dist-packages/twisted/conch/test/test_recvline.py�<lambda>'�z#ArrowsTests.setUp.<locals>.<lambda>)rZunderlyingTransportr�ServerProtocol�ptr�HistoricRecvLiner�protocolFactory�factory�makeConnectionrrrr�setUp#s

zArrowsTests.setUpcCs@|j�dd�|j�dd�|j�dd�|�|j��d�dS)zy
        When L{HistoricRecvLine} receives a printable character,
        it adds it to the current line buffer.
        �xN�y�z��xyzr)r�keystrokeReceived�assertEqual�currentLineBufferrrrr�test_printableCharacters,sz$ArrowsTests.test_printableCharacterscsF�fdd�}td�D]}||�q���j��d�|�jj����j��d�|�jj����j��d�|�jj����j��d�|�jj����j��d�|�jj����j��d�|�jj����j��d�|�jj����j��d�|�jj����j��d�|�jj����j��d�dS)	z�
        When L{HistoricRecvLine} receives a LEFT_ARROW or
        RIGHT_ARROW keystroke it moves the cursor left or right
        in the current line buffer, respectively.
        cs�j�|d�Sr�rr%��chrrrr>rz3ArrowsTests.test_horizontalArrows.<locals>.<lambda>r$r#)�xyr")r syz)rr$N)rr&rr'rZRIGHT_ARROW�
LEFT_ARROW�r�kRr+rrr�test_horizontalArrows8s,
z!ArrowsTests.test_horizontalArrowscs|�fdd�}td�D]}||�q���j��d�|d�|d�|d����j��d�|d����j��d	�d
S)z�
        When {HistoricRecvLine} receives a newline, it adds the current
        line buffer to the end of its history buffer.
        cs�j�|d�Srr)r*rrrrerz*ArrowsTests.test_newline.<locals>.<lambda>�xyz
abc
123
��r$�abc�123r�c�b�a�
))r$r4r5scbarN)rr&r�currentHistoryBufferr.rrr�test_newline`s 
���zArrowsTests.test_newlinecs8�fdd�}td�D]}||�q���j��d����j��d�|�jj����j��d����j��d�|�jj����j��d����j��d	�|�jj����j��d
����j��d�|�jj����j��d
����j��d�td�D]}|�jj��q���j��d�d
S)a
        When L{HistoricRecvLine} receives UP_ARROW or DOWN_ARROW
        keystrokes it move the current index in the current history
        buffer up or down, and resets the current line buffer to the
        previous or next line in history, respectively for each.
        cs�j�|d�Srr)r*rrrrrz1ArrowsTests.test_verticalArrows.<locals>.<lambda>r1r2�rr))r$r4)r5)r5r))r$)r4r5)r4r)rr3r#�N)	rr&rr:r'rZUP_ARROW�rangeZ
DOWN_ARROW)rr/r+�irrr�test_verticalArrowsxs@
������zArrowsTests.test_verticalArrowscsV�fdd�}td�D]}||�q���j��d�|�jj����j��d�dS)z�
        When L{HistoricRecvLine} receives a HOME keystroke it moves the
        cursor to the beginning of the current line buffer.
        cs�j�|d�Srr)r*rrrr�rz'ArrowsTests.test_home.<locals>.<lambda>�hello, world�rAr)rrAN)rr&rr'r�HOMEr.rrr�	test_home�s
zArrowsTests.test_homecsb�fdd�}td�D]}||�q���j��d�|�jj�|�jj����j��d�dS)z�
        When L{HistoricRecvLine} receives an END keystroke it moves the cursor
        to the end of the current line buffer.
        cs�j�|d�Srr)r*rrrr�rz&ArrowsTests.test_end.<locals>.<lambda>rArBN)rr&rr'rrCZENDr.rrr�test_end�s
zArrowsTests.test_endcs��fdd�}td�D]}||�q���j��d�|�jj����j��d�|�jj�|�jj����j��d�|�jj����j��d�dS)z�
        When L{HistoricRecvLine} receives a BACKSPACE keystroke it deletes
        the character immediately before the cursor.
        cs�j�|d�Srr)r*rrrr�rz,ArrowsTests.test_backspace.<locals>.<lambda>r$r#�r,r)rr!N)rr&rr'rZ	BACKSPACEr-r.rrr�test_backspace�s
zArrowsTests.test_backspacecs��fdd�}td�D]}||�q���j��d�|�jj����j��d�|�jj�|�jj����j��d�|�jj�|�jj����j��d�|�jj�|�jj����j��d�|�jj����j��d�dS)	z�
        When L{HistoricRecvLine} receives a DELETE keystroke, it
        delets the character immediately after the cursor.
        cs�j�|d�Srr)r*rrrr�rz)ArrowsTests.test_delete.<locals>.<lambda>r$r#rF)r rr<N)rr&rr'rZDELETEr-r.rrr�test_delete�s"
zArrowsTests.test_deletecsr�fdd�}td�D]}||�q|�jj�|d����j��d�|�jj�|d����j��d�dS)	z�
        When not in INSERT mode, L{HistoricRecvLine} inserts the typed
        character at the cursor before the next character.
        cs�j�|d�Srr)r*rrrr�rz)ArrowsTests.test_insert.<locals>.<lambda>r$�A)�xyAr"�B)�xyBsAzN)rrr-r&rr'r.rrr�test_insert�s
zArrowsTests.test_insertcs~�fdd�}td�D]}||�q|�jj�|�jj�|d����j��d�|�jj�|d����j��d�dS)	a
        When in INSERT mode and upon receiving a keystroke with a printable
        character, L{HistoricRecvLine} replaces the character at
        the cursor with the typed character rather than inserting before.
        Ah, the ironies of INSERT mode.
        cs�j�|d�Srr)r*rrrrrz+ArrowsTests.test_typeover.<locals>.<lambda>r$rI)rJrrK)rLrN)rrZINSERTr-r&rr'r.rrr�
test_typeovers
zArrowsTests.test_typeovercsr�fdd�}�j}|j|j|j|j|j|j|j|j|j	|j
|j|j|j
|jfD]}||����j��d�qNdS)z�
        When L{HistoricRecvLine} receives a keystroke for an unprintable
        function key with no assigned behavior, the line buffer is unmodified.
        cs�j�|d�Srr)r*rrrr$rz8ArrowsTests.test_unprintableCharacters.<locals>.<lambda>r<N)rZF1ZF2ZF3ZF4ZF5ZF6ZF7ZF8ZF9ZF10ZF11ZF12ZPGUPZPGDNr&rr')rr/rr+rrr�test_unprintableCharacterss �z&ArrowsTests.test_unprintableCharactersN)�__name__�
__module__�__qualname__rr(r0r;r@rDrErGrHrMrNrOrrrrr"s	(*r)�telnet)�helper)�
LoopbackRelayc@seZdZdd�ZdS)�
EchoServercCs |j�|d|j|j�dS)Nr9)Zterminal�writeZpsZpn)r�linerrr�lineReceived3szEchoServer.lineReceivedN)rPrQrRrYrrrrrV2srVsssss[2~s[1~s[3~s[4~�)�checkers)�userauth�	transport�channel�
connection�session�keys)�TerminalUser�TerminalSession�
TerminalRealm�TerminalSessionTransport�ConchFactoryFTc@s0eZdZdZdd�Zdd�Zdd�Zdd	�Zd
S)�SessionChannelssessioncOs6tjj|f|�|�||_||_||_||_||_dSr)r^�
SSHChannel�__init__r�protocolArgs�protocolKwArgs�width�height�rrrjrkrlrm�a�kwrrrriNszSessionChannel.__init__cCsdt�d|j|jddfd�}|j�|d|�|j�|dd�|j|j|j�|_	||j	_
|j	�|�dS)Nsvt102rrspty-reqsshell)r`ZpackRequest_pty_reqrmrl�connZsendRequestrrjrk�_protocolInstancerr)r�dataZtermrrr�channelOpenYszSessionChannel.channelOpencCs|j�t���dSr)rr�connectionLostrZConnectionDonerrrr�closedcszSessionChannel.closedcCs|j�|�dSr)rr�dataReceived�rrsrrrrwgszSessionChannel.dataReceivedN)rPrQrR�namerirtrvrwrrrrrgKs

rgc@s$eZdZdd�Zdd�Zdd�ZdS)�TestConnectioncOs6tjj|f|�|�||_||_||_||_||_dSr)r_�
SSHConnectionrirrjrkrlrmrnrrrrilszTestConnection.__init__cCs,t|j|j|j|j|j�|_|�|j�dSr)rgrrjrkrlrm�_TestConnection__channelZopenChannelrrrr�serviceStartedwszTestConnection.serviceStartedcCs|j�|�Sr)r|rWrxrrrrW|szTestConnection.writeN)rPrQrRrir}rWrrrrrzksrzc@seZdZdd�Zdd�ZdS)�TestAuthcOs tjj||f|�|�||_dSr)r\�SSHUserAuthClientri�password)r�usernamer�rorprrrri�szTestAuth.__init__cCst�|j�Sr)r
�succeedr�rrrr�getPassword�szTestAuth.getPasswordN)rPrQrRrir�rrrrr~�sr~c@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�
TestTransportc
Os.||_||_||_||_||_||_||_dSr)rrjrkr�r�rlrm)
rrrjrkr�r�rlrmrorprrrri�szTestTransport.__init__cCs
t�d�S)NT)r
r�)rZhostKeyZfingerprintrrr�
verifyHostKey�szTestTransport.verifyHostKeycCs8t|j|j|j|j|j�|_|�t|j	|j
|j��dSr)rzrrjrkrlrm�_TestTransport__connectionZrequestServicer~r�r�rrrr�connectionSecure�s�zTestTransport.connectionSecurecCs|j�|�Sr)r�rWrxrrrrW�szTestTransport.writeN)rPrQrRrir�r�rWrrrrr��s
r�c@seZdZdd�ZdS)�TestSessionTransportcCs|jjjj��Sr)Zavatarrqr]r�serverProtocolrrrrr�sz$TestSessionTransport.protocolFactoryN)rPrQrRrrrrrr��sr�c@seZdZeZdS)�TestSessionN)rPrQrRr�ZtransportFactoryrrrrr��sr�c@seZdZdS)�TestUserN�rPrQrRrrrrr��sr�c@s$eZdZdd�Zdd�Zdd�ZdS)�NotifyingExpectableBuffercCst��|_t��|_dSr)r
ZDeferred�onConnection�onDisconnectionrrrrri�s
z"NotifyingExpectableBuffer.__init__cCstj�|�|j�|�dSr)rT�ExpectableBuffer�connectionMader��callbackrrrrr��sz(NotifyingExpectableBuffer.connectionMadecCs|j�|�dSr)r�Zerrback)r�reasonrrrru�sz(NotifyingExpectableBuffer.connectionLostN)rPrQrRrir�rurrrrr��sr�c@s$eZdZdZdZdd�Zdd�ZdS)�
_BaseMixin�P�c
Cs�|j����}|dg|jt|�d}|�t|�t|��tt|��D]Z}|�||||d�|td|d�|d��dd�|td|d�|d���qJdS)Nr�rs != )	�recvlineClient�	__bytes__�
splitlines�HEIGHT�lenr&r>�join�max)r�linesZ
receivedLinesZ
expectedLinesr?rrr�
_assertBuffer�s���z_BaseMixin._assertBuffercs.�j�d�}��|���fdd�}|�|�S)Nsdonecs����dSr)r�)Zign��outputrrr�finished�sz)_BaseMixin._trivialTest.<locals>.finished)r��expect�
_testwriteZaddCallback)rZ	inputLiner�Zdoner�rr�r�_trivialTest�s
z_BaseMixin._trivialTestN)rPrQrR�WIDTHr�r�r�rrrrr��sr�c@seZdZdd�Zdd�ZdS)�	_SSHMixincs@tst�d��d\}}t�}t|_�fdd�|_t��}|�	||�t
�|�}|�|�t
|�}tjt�|���dd�}||jd<||jd<|j|_|��|���t��fdd���|�d�}t|�}	t��t��fd	d���t�fd
d�di|||j|j�}
t|
�}|
� |	�|� |��|_!|
|_"||_#|	|_$||_%�j&S)NzMcryptography requirements missing, can't run historic recvline tests over ssh)stestuserstestpasscs�Srrr��
insultsServerrrr�rz!_SSHMixin.setUp.<locals>.<lambda>i)ZkeySizesssh-rsacs�Srrr��recvlineServerrrr�rcs�Srrr�r�rrr�rcs�Srrr��
insultsClientrrrrr)'�sshrZSkipTestrdr�ZuserFactoryZchainedProtocolFactoryr[Z'InMemoryUsernamePasswordDatabaseDontUseZaddUserr
ZPortalZregisterCheckerrfraZ_getPersistentRSAKeyrZFilePathZmktempZ
publicKeysZprivateKeysr�ZstartFactoryrrZ
buildProtocolrUr��ClientProtocolr�r�r�rr��	sshClient�	sshServer�clientTransport�serverTransportr�)r�urZrlmZcheckerZptlZ
sshFactoryZsshKeyr�r�r�r�r�r�r�r�r�rr�sH�

�


 

z_SSHMixin.setUpcCs|j�|�dSr)r�rWrxrrrr�sz_SSHMixin._testwriteN�rPrQrRrr�rrrrr��s/r�)�test_telnetc@seZdZdS)�TestInsultsClientProtocolNr�rrrrr�sr�c@seZdZdS)�TestInsultsServerProtocolNr�rrrrr�sr�c@seZdZdd�Zdd�ZdS)�_TelnetMixincs�|���t�fdd���t��fdd��}t|�}t��t�fdd���t��fdd��}t|�}|�|�|�|�|��|���|_	||_
||_||_�j
S)Ncs�Srrrr�rrr%rz$_TelnetMixin.setUp.<locals>.<lambda>cs�Srrrr�rrr&rcs�Srrrr�rrr*rcs�Srrrr�rrr+r)r�r�rSZTelnetTransportrUr�r�rZclearBufferr��telnetClientr�r�r�)rZtelnetServerr�r�r�rr�rr#s"

z_TelnetMixin.setUpcCs|j�|�dSr)r�rWrxrrrr�<sz_TelnetMixin._testwriteNr�rrrrr�"sr�)�stdioc@s$eZdZdd�Zdd�Zdd�ZdS)�_StdioMixincs�t��t��fdd��}t�|�}tj}tj}|�d�sB|�d�rN|dd�}||t	�
|j�g}t�
�svdd�|D�}dd	lm}|j|||td
d�}�|_|_||_||_t�td|j��d�g��S)
Ncs�Srrr��testTerminalrrrQrz#_StdioMixin.setUp.<locals>.<lambda>z.pycz.pyo���cSsg|]}|�t����qSr)�encode�sys�getfilesystemencoding)�.0�argrrr�
<listcomp>asz%_StdioMixin.setUp.<locals>.<listcomp>r)�reactorT)�envZusePTYs>>> )r�rr�r�ZTerminalProcessProtocolr��
executable�__file__�endswithrZqualr�r	�	isWindows�twisted.internetr�ZspawnProcess�	properEnvr�r��
processClientr�r
Z
gatherResults�filterr�r�)rr�r��exe�module�argsr�r�rr�rrGs,

��z_StdioMixin.setUpc	sFz�j�d�Wntjtfk
r*YnX�fdd�}�jj�|�S)NZKILLcs.|�tj���|jj���|jjd�dS)N�	)�traprZProcessTerminatedZassertIsNone�valueZexitCoder&Zstatus)Zfailurerrrr�ysz"_StdioMixin.tearDown.<locals>.trap)r�Z
signalProcessrZProcessExitedAlready�OSErrorr�r�Z
addErrback)rr�rrr�tearDownssz_StdioMixin.tearDowncCs|j�|�dSr)r�rWrxrrrr��sz_StdioMixin._testwriteN)rPrQrRrr�r�rrrrr�Fs,
r�c@sXeZdZeZdd�Zdd�Zdd�Zdd�Zd	d
�Z	dd�Z
d
d�Zdd�Zdd�Z
dS)�RecvlineLoopbackMixincCs|�ddddg�S)Nsfirst line
done�>>> first line�
first line�>>> done)r�rrrr�
testSimple�s��z RecvlineLoopbackMixin.testSimplecCs"|�tdtdddddg�S)Nr�r=�	xxxx
dones>>> first xxxxs
first xxxxr�)r��insert�leftrrrr�
testLeftArrow�s��z#RecvlineLoopbackMixin.testLeftArrowcCs*|�tdtdtdddddg�S)Ns
right liner=�sxx
dones>>> right lixxs
right lixxr�)r�r�r��rightrrrr�testRightArrow�s��z$RecvlineLoopbackMixin.testRightArrowcCs|�dtdddddg�S)N�second liner=r�s>>> second xxxxssecond xxxxr�)r��	backspacerrrr�
testBackspace�s��z#RecvlineLoopbackMixin.testBackspacecCs&|�dtdtdddddg�S)Nsdelete xxxxr=�	line
dones>>> delete linesdelete liner�)r�r��deleterrrr�
testDelete�s��z RecvlineLoopbackMixin.testDeletecCs|�dtdddddg�S)Ns	third ine��l
dones>>> third lines
third liner�)r�r�rrrr�
testInsert�s��z RecvlineLoopbackMixin.testInsertcCs"|�dtdtddddg�S)Nsfourth xiner=r�s>>> fourth linesfourth liner�)r�r�r�rrrr�testTypeover�s��z"RecvlineLoopbackMixin.testTypeovercCs|�tdtddddg�S)Ns	blah lines	home
dones
>>> home lines	home liner�)r�r��homerrrr�testHome�s��zRecvlineLoopbackMixin.testHomecCs"|�dtdtddddg�S)Nsend r=r�s>>> end linesend liner�)r�r��endrrrr�testEnd�s��zRecvlineLoopbackMixin.testEndN)rPrQrRrVr�r�r�r�r�r�r�r�r�r�rrrrr��sr�c@seZdZdS)�RecvlineLoopbackTelnetTestsNr�rrrrr��sr�c@seZdZdS)�RecvlineLoopbackSSHTestsNr�rrrrr��sr�c@seZdZedkrdZdS)�RecvlineLoopbackStdioTestsNzBTerminal requirements missing, can't run recvline tests over stdio�rPrQrRr��skiprrrrr��sr�c@s(eZdZeZdd�Zdd�Zdd�ZdS)�HistoricRecvlineLoopbackMixincCs|�dtddddddg�S)Nsfirst line
�
doner�r�r�)r��uprrrr�testUpArrow�s
��z)HistoricRecvlineLoopbackMixin.testUpArrowcCs"|�dttddddddg�S)aH
        Pressing down arrow to visit an entry that was added to the
        history by pressing the up arrow instead of return does not
        raise a L{TypeError}.

        @see: U{http://twistedmatrix.com/trac/ticket/9031}

        @return: A L{defer.Deferred} that fires when C{b"done"} is
            echoed back.
        sfirst line
partial liner�r�r�s>>> partial linespartial liner��r�r��downrrrr�$test_DownArrowToPartialLineInHistory�s��zBHistoricRecvlineLoopbackMixin.test_DownArrowToPartialLineInHistoryc
Cs*|�dtdtddddddddg�S)	Nsfirst line
second line
r�r�r�r�s>>> second liner�r�r�rrrr�
testDownArrows��z+HistoricRecvlineLoopbackMixin.testDownArrowN)rPrQrRrVr�r�rrrrrrr��s
r�c@seZdZdS)�#HistoricRecvlineLoopbackTelnetTestsNr�rrrrrsrc@seZdZdS)� HistoricRecvlineLoopbackSSHTestsNr�rrrrrsrc@seZdZedkrdZdS)�"HistoricRecvlineLoopbackStdioTestsNzKTerminal requirements missing, can't run historic recvline tests over stdior�rrrrrsrc@seZdZdZdd�ZdS)�TransportSequenceTestsz5
    L{twisted.conch.recvline.TransportSequence}
    cCs|�ttj�dS)zh
        Initializing a L{recvline.TransportSequence} with no args
        raises an assertion.
        N)ZassertRaises�AssertionErrorrZTransportSequencerrrr�test_invalidSequence%sz+TransportSequenceTests.test_invalidSequenceN)rPrQrR�__doc__rrrrrr sr)gr�osr�Ztwisted.conch.insultsrZ
twisted.conchrZtwisted.pythonrrrZtwisted.python.compatrrZtwisted.python.runtimer	r�r
rZ
twisted.trialrZtwisted.credr
Ztwisted.test.proto_helpersrr��dict�environr��pathsepr��pathr�r�ZTestCaserrSrTZtwisted.conch.test.loopbackrUrrVr�r�r�r�r�r�r�r�r�r[Ztwisted.conch.sshr\r]r^r_r`raZtwisted.conch.manhole_sshrbrcrdrerf�ImportErrorr�rhrgr{rzrr~ZSSHClientTransportr�r�r�r�ZregisterAdapterZISessionr�r�r�r�Ztwisted.conch.testr�r�ZTestProtocolr�rr�r�r�r�r�r�r�r�r�rrrrrrrr�<module>s�
�  
 
5��
?L/