123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- from test_dropbear import *
- import signal
- import queue
- import socket
- # Tests for various edge cases of SSH channels and connection service
- def test_exitcode(request, dropbear):
- r = dbclient(request, "exit 44")
- assert r.returncode == 44
- @pytest.mark.xfail(reason="Not yet implemented", strict=True)
- def test_signal(request, dropbear):
- r = dbclient(request, "kill -FPE $$")
- assert r.returncode == -signal.SIGFPE
- @pytest.mark.parametrize("size", [0, 1, 2, 100, 5000, 200_000])
- def test_roundtrip(request, dropbear, size):
- dat = os.urandom(size)
- r = dbclient(request, "cat", input=dat, capture_output=True)
- r.check_returncode()
- assert r.stdout == dat
- @pytest.mark.parametrize("size", [0, 1, 2, 100, 20001, 41234])
- def test_read_pty(request, dropbear, size):
- # testcase for
- # https://bugs.openwrt.org/index.php?do=details&task_id=1814
- # https://github.com/mkj/dropbear/pull/85
- # From Yousong Zhou
- # Fixed Oct 2021
- #
- #$ ssh -t my.router cat /tmp/bigfile | wc
- #Connection to my.router closed.
- # 0 1 14335 <- should be 20001
- # Write the file. No newlines etc which could confuse ptys
- dat = random_alnum(size)
- r = dbclient(request, "tmpf=`mktemp`; echo $tmpf; cat > $tmpf", input=dat, capture_output=True, text=True)
- tmpf = r.stdout.rstrip()
- r.check_returncode()
- # Read with a pty, this is what is being tested.
- # Timing/buffering is subtle, we seem to need to cat a file from disk to hit it.
- m, s = pty.openpty()
- r = dbclient(request, "-t", f"cat {tmpf}; rm {tmpf}", stdin=s, capture_output=True)
- r.check_returncode()
- assert r.stdout.decode() == dat
- @pytest.mark.parametrize("fd", [1, 2])
- def test_bg_sleep(request, fd, dropbear):
- # https://lists.ucc.asn.au/pipermail/dropbear/2006q1/000362.html
- # Rob Landley "Is this a bug?" 24 Mar 2006
- # dbclient user@system "sleep 10& echo hello"
- #
- # It should return right after printing hello, but it doesn't. It waits until
- # the child process exits.
- # failure is TimeoutExpired
- redir = "" if fd == 1 else " >&2 "
- r = dbclient(request, f"sleep 10& echo hello {redir}",
- capture_output=True, timeout=2, text=True)
- r.check_returncode()
- st = r.stdout if fd == 1 else r.stderr
- if fd == 2 and 'accepted unconditionally' in st:
- # ignore hostkey warning, a bit of a hack
- assert st.endswith("\n\nhello\n")
- else:
- assert st.rstrip() == "hello"
- def test_idle(request, dropbear):
- # Idle test, -I 1 should make it return before the 2 second timeout
- r = dbclient(request, "-I", "1", "echo zong; sleep 10",
- capture_output=True, timeout=2, text=True)
- r.check_returncode()
- assert r.stdout.rstrip() == "zong"
- @pytest.mark.parametrize("size", [1, 4000, 40000])
- def test_netcat(request, dropbear, size):
- opt = request.config.option
- if opt.remote:
- pytest.xfail("don't know netcat address for remote")
- dat1 = os.urandom(size)
- dat2 = os.urandom(size)
- with HandleTcp(3344, 1, dat2) as tcp:
- r = dbclient(request, "-B", "localhost:3344", input=dat1, capture_output=True)
- r.check_returncode()
- assert r.stdout == dat2
- assert tcp.inbound() == dat1
- @pytest.mark.parametrize("size", [1, 4000, 40000])
- @pytest.mark.parametrize("fwd_flag", "LR")
- def test_tcpflushout(request, dropbear, size, fwd_flag):
- """ Tests that an opened TCP connection prevent a SSH session from being closed
- until that TCP connection has finished transferring
- """
- opt = request.config.option
- if opt.remote:
- pytest.xfail("don't know address for remote")
- dat1 = os.urandom(size)
- dat2 = os.urandom(size)
- q = queue.Queue()
- with HandleTcp(3344, timeout=1, response=q) as tcp:
- r = dbclient(request, f"-{fwd_flag}", "7788:localhost:3344", "sleep 0.1; echo -n done",
- text=True, background=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
- # time to let the listener start
- time.sleep(0.1)
- # open a tcp connection
- c = socket.create_connection(("localhost", 7788))
- # wait for the shell to finish. sleep a bit longer in case it exits.
- assert r.stdout.read(4) == "done"
- time.sleep(0.1)
- # now the shell has finished, we can write on the tcp socket
- c.sendall(dat2)
- c.shutdown(socket.SHUT_WR)
- q.put(dat1)
- # return a tcp response
- q.put(None)
- # check hasn't exited
- assert r.poll() == None
- # read the response
- assert readall_socket(c) == dat1
- c.close()
- assert tcp.inbound() == dat2
- # check has exited, allow time for dbclient to exit
- time.sleep(0.1)
- assert r.poll() == 0
|