前言

SSH(Secure Shell)协议是一种加密的网络传输协议,使得在不安全的网络环境中可以安全的执行远程登录、远程命令执行和数据传输等操作。

SSH 协议的组成可以大致分成三个模块:

img

  1. 传输层协议(Transport Layer Protocol):传输层协议负责在客户端和服务器之间建立安全的连接。它提供了一种可靠的数据流,用于在不安全的网络环境中传输加密的数据。传输层协议还负责进行密钥交换、协商加密算法和数据完整性检查等操作。
  2. 用户认证协议(User Authentication Protocol):用户认证协议用于验证客户端用户的身份。在SSH 连接建立之后,客户端需要提供有效的认证凭据(如用户名和密码、公钥等),以证明其具有访问服务器的权限。SSH 支持多种认证方法,包括基于密码的认证、基于公钥的认证和基于其他安全令牌的认证等。
  3. 连接协议(Connection Protocol):连接协议负责在已建立的安全连接上提供多种服务,如交互式 Shell、远程命令执行、端口转发和文件传输等。连接协议允许客户端在同一个 SSH 连接上同时运行多个会话,每个会话可以使用不同的服务。

传输层协议比较偏向于基础设施类协议,本文中我们所说的定制集中在用户认证协议和连接层协议的定制。

本文中的所有代码基于 Python3 + asyncssh 库进行实现。

关于 asyncssh 库

在基于 asyncssh 库实现 ssh 服务时,应用程序需要继承并实现 SSHServer 类。

常见的需要重载的方法如:

  • connection_made
  • begin_auth
  • connection_lost
  • xxxx_supported
  • validate_xxxx

SSHServer 类主要控制客户端与SSH服务之间的认证行为。

在认证完成后,客户端与服务之间的连接行为,需要通过 SSHServerSession 类来完成,业务需要继承这个类并重载相关方法如:

  • connection_made
  • pty_requested
  • shell_requested
  • exec_requested
  • subsystem_requested
  • session_started
  • data_received

还有一点需要注意的是,在阅读 asyncssh 的文档以及源码时,发现有部分错误,详情可以参考:关于 SSHServerSession 的 Issueimg

用户认证协议的定制

本文主要侧重于三种比较常见的认证方式:

  • 用户名密码认证
  • 公钥认证
  • 交互式认证

我们在使用 ssh 命令行时,可以指定认证方式的优先级:

1
ssh -o PreferredAuthentications=none,keyboard-interactive,publickey,password user@example.com

用户名密码认证

用户名密码认证主要有两方面的功能:

  1. 校验当前用户名与密码是否正确
  2. 校验当前用户名的密码是否已经过期,如果已经过期则需要改密
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import asyncio
import json
import sys
import time
from typing import Any, Dict, Optional

import asyncssh
from asyncssh.misc import MaybeAwait, PasswordChangeRequired, PermissionDenied

class demo_ssh_server(asyncssh.SSHServer):
    def __init__(self):
        self.AUTH_MAX_RETRIES: int = 2
        self.CHANGE_MAX_RETRIES: int = 2
        self.password_retried: int = 0
        self.update_password_retried: int = 0
        self.connected_time = ''
        self.auth_method = 'none'
        self.auth_username = ''
        self.auth_password = ''
        self.auth_new_password = ''
        self.auth_success: bool = False

    def connection_made(self, conn: asyncssh.SSHServerConnection) -> None:
        self.connected_time = time.strftime('%Y-%m-%d %H:%M:%S %p %A %B')

    def session_requested(self):
        return demo_ssh_session(self)

    def connection_lost(self, exc: Optional[Exception]) -> None:
        if exc:
            print('SSH connection error: ' + str(exc), file=sys.stderr)
        else:
            print('SSH connection closed.')

    def begin_auth(self, username: str) -> bool:
        print('begin_auth, username:{}'.format(username))
        return True  # 需要认证

    def password_auth_supported(self) -> bool:
        return True  # 开启密码认证

    def validate_password(self, username: str, password: str) -> MaybeAwait[bool]:
        self.auth_method = 'password'
        self.auth_username = username
        self.auth_password = password
        
        print('username:{}, password:{}, auth password retried:{}, MAX LIMIT:{}'.format(
            username, password, self.update_password_retried, self.AUTH_MAX_RETRIES))
        
        # 如果密码重试次数已经超过了最大限度,则返回失败
        if self.password_retried > self.AUTH_MAX_RETRIES or self.update_password_retried > self.CHANGE_MAX_RETRIES:
            raise PermissionDenied(reason='demo ssh server, password auth failed, user:[{}]'.format(username))
        
        # 用户名不为 test 和change 时,认为校验失败
        if username != 'test' and username != 'change':
            self.password_retried += 1
            return False
        
        # 校验 username 和password 是否匹配,如果匹配则返回 True
        # 当用户名为 test 时,密码必须为 test
        if username == 'test' and password != 'test':
            self.password_retried += 1
            return False
        
        # 当用户名为 change 时,要求客户端更改密码
        if username == 'change' and len(password) > 0:
            self.password_retried += 1
            raise PasswordChangeRequired(
                prompt='change your password for username:[{}]'.format(username))
        
        self.auth_success = True
        return True

    def change_password(self, username: str, old_password: str, new_password: str) -> MaybeAwait[bool]:
        print('update password retried:{}, MAX LIMIT:{}'.format(
            self.update_password_retried, self.CHANGE_MAX_RETRIES))
        
        # 如果重置密码的次数超过上限则返回 False
        if self.update_password_retried > self.CHANGE_MAX_RETRIES:
            raise PermissionDenied(
                reason='demo ssh server, update password failed, user:[{}]'.format(username))
        
        # 如果老的密码和新的密码一致 或 新密码为空,则要求再次改变密码
        if old_password == new_password or len(new_password) <= 0:
            self.update_password_retried += 1
            raise PasswordChangeRequired('retry change password for [{}]'.format(username))
        
        self.auth_new_password = new_password
        # 密码认证且更新了密码
        self.auth_method = 'password and updated'
        self.auth_success = True
        return True

    def auth_completed(self) -> None:
        print('demo ssh server, auth_method:{}, '
              'auth_username:{}, auth_password:{}, '
              'auth_pwd_retried:{}, changed_pwd_retried:{}, '
              'new_password:{}, auth_result:{}'.format(
                  self.auth_method, self.auth_username, self.auth_password,
                  self.password_retried, self.update_password_retried,
                  self.auth_new_password, self.auth_success))
        pass

class demo_ssh_session(asyncssh.SSHServerSession):
    def __init__(self, server: demo_ssh_server):
        self.server: demo_ssh_server = server

    def connection_made(self, channel):
        super().connection_made(channel)
        ret: Dict[str, Any] = {
            'auth_method': self.server.auth_method,
            'auth_username': self.server.auth_username,
            'auth_password': self.server.auth_password,
            'password_retried': self.server.password_retried,
            'update_password_retried': self.server.update_password_retried,
            'auth_new_password': self.server.auth_new_password,
            'auth_succeed': self.server.auth_success,
        }
        data = json.dumps(ret, ensure_ascii=False, sort_keys=True)
        # 当密码校验成功后,客户端与服务端建立了 session,此时回写登录信息给客户端
        channel.write("{}\n".format(data))
        channel.exit(0)

async def start_server() -> None:
    await asyncssh.create_server(
        server_factory=demo_ssh_server,
        host='127.0.0.1',
        port=18822,
        server_version='DEMO-TEST-AsyncSSH_2.13.1',  # 自定义服务器 SSH 协议版本名称
        server_host_keys=['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
    )
    await asyncio.Event().wait()  # 添加一个永远不会触发的事件,使服务器保持运行状态

if __name__ == '__main__':
    try:
        print(asyncssh.__version__)
        asyncio.run(start_server())
    except (OSError, asyncssh.Error) as exc:
        sys.exit('Error starting server: ' + str(exc))

测试结果:

  • 当使用错误的用户名登陆时:认证失败

    img

  • 当使用正确的用户名,先输错密码,最后输入正确密码时:认证成功

    img

  • 使用需要改密的用户名时:会提示更改密码

    img

交互式认证

交互式认证的流程为:先向用户给出提示,获得客户端的输入,最后根据用户的输入判断认证是否成功。这里可以结合双因子认证来做,比如结合短信验证码或小程序OTP码等。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import asyncio
import sys
from typing import Optional

import asyncssh
from asyncssh.auth import KbdIntChallenge, KbdIntResponse
from asyncssh.misc import MaybeAwait
"""
ssh -o PreferredAuthentications=keyboard-interactive user@example.com
"""
class demo_ssh_server(asyncssh.SSHServer):
    def __init__(self):
        self.first_auth = False
        self.second_auth = False
        pass

    def connection_lost(self, exc: Optional[Exception]) -> None:
        if exc:
            print('SSH connection error: ' + str(exc), file=sys.stderr)
        else:
            print('SSH connection closed.')

    def begin_auth(self, username: str) -> MaybeAwait[bool]:
        return True  # 需要认证

    def kbdint_auth_supported(self) -> bool:
        return True  # 支持键盘交互式认证

    def get_kbdint_challenge(self, username: str, lang: str, submethods: str) -> MaybeAwait[KbdIntChallenge]:
        self.first_auth = True
        print('username:{}, lang:{}, submethods:{}'.format(username, lang, submethods))
        # 挑战名称、说明、语言标签、[(提示符字符串、是否回显)]
        return ('ChallengeName', 'ChallengeDesc', 'Tag',
                [('Input something with echo:', True),
                 ('Input something without echo:', False)])

    def validate_kbdint_response(self, username: str, responses: KbdIntResponse) -> MaybeAwait[KbdIntChallenge]:
        if self.first_auth and not self.second_auth:
            print('username:{}, responses:{}'.format(username, responses))
            self.second_auth = True
            return ('Second ChallengeName', 'Second ChallengeDesc', 'Second Tag',
                    [('Input something AGAIN with echo:', True),
                     ('Input something AGAIN without echo:', False)])
        
        if self.first_auth and self.second_auth:
            print('AGAIN username:{}, responses:{}'.format(username, responses))
            return True
        
        return False

async def start_server() -> None:
    await asyncssh.create_server(
        server_factory=demo_ssh_server,
        host='127.0.0.1',
        port=18822,
        server_version='DEMO-TEST-AsyncSSH_2.13.1',
        server_host_keys=['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
    )
    await asyncio.Event().wait()

if __name__ == '__main__':
    try:
        print(asyncssh.__version__)
        asyncio.run(start_server())
    except (OSError, asyncssh.Error) as exc:
        sys.exit('Error starting server: ' + str(exc))

img

公钥认证方式

公钥认证方式指的是,客户端提前将公钥种到了服务器中(.ssh/authorized_keys),然后客户端在 ssh 登录时,会直接和 ssh 服务器验证公钥的合法性。当然,ssh 服务器也可以通过解析 ssh 客户端发过来的公钥数据,进行实时的放行或拦截。

基于authorized_keys文件进行的 ssh 公钥认证

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import sys
from typing import Optional

import asyncssh

class demo_ssh_server(asyncssh.SSHServer):
    def connection_lost(self, exc: Optional[Exception]) -> None:
        if exc:
            print('SSH connection error: ' + str(exc), file=sys.stderr)
        else:
            print('SSH connection closed.')

    def public_key_auth_supported(self) -> bool:
        return True

async def start_server() -> None:
    await asyncssh.create_server(
        server_factory=demo_ssh_server,
        host='127.0.0.1',
        port=18822,
        server_version='DEMO-TEST-AsyncSSH_2.13.1',
        server_host_keys=['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
        authorized_client_keys='/Users/cyx/.ssh/authorized_keys'
    )
    await asyncio.Event().wait()

if __name__ == '__main__':
    try:
        print(asyncssh.__version__)
        asyncio.run(start_server())
    except (OSError, asyncssh.Error) as exc:
        sys.exit('Error starting server: ' + str(exc))

此时服务器中的公钥文件为:

img

客户端分别使用合法的公钥以及非法的公钥访问时,会得到:

img

ssh服务器自动解析客户端发送来的公钥

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import asyncio
import hashlib
import sys
from typing import List, Optional

import asyncssh
from asyncssh import SSHKey
from asyncssh.misc import MaybeAwait
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from openssh_key_parse import protocol_binary_to_authorized_key_rsa

class demo_ssh_server(asyncssh.SSHServer):
    def __init__(self):
        self.auth_client_pub_keys: List[bytes] = []
        pass

    def begin_auth(self, username: str) -> MaybeAwait[bool]:
        # 预先加载好客户认证需要的密钥对
        local_key_path = ['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa', 
                         '/Users/cyx/.ssh/id_rsa']
        private_key = None
        
        for p in local_key_path:
            with open(p, 'rb') as key_file:
                key_file_data = key_file.read()
                if key_file_data.startswith('-----BEGIN OPENSSH PRIVATE KEY-----'.encode(encoding='utf-8')):
                    private_key = serialization.load_ssh_private_key(
                        key_file_data, password=None, backend=default_backend())
                if key_file_data.startswith('-----BEGIN RSA PRIVATE KEY-----'.encode(encoding='utf-8')):
                    private_key = serialization.load_pem_private_key(
                        key_file_data, password=None, backend=default_backend())
                
                pub_key_data = private_key.public_key().public_bytes(
                    encoding=serialization.Encoding.OpenSSH,
                    format=serialization.PublicFormat.OpenSSH)
                self.auth_client_pub_keys.append(pub_key_data)
        return True

    def connection_lost(self, exc: Optional[Exception]) -> None:
        if exc:
            print('SSH connection error: ' + str(exc), file=sys.stderr)
        else:
            print('SSH connection closed.')

    def password_auth_supported(self) -> bool:
        return False

    def kbdint_auth_supported(self) -> bool:
        return False

    def public_key_auth_supported(self) -> bool:
        return True

    def validate_public_key(self, username: str, key: SSHKey) -> MaybeAwait[bool]:
        print('try login, username:{}, pub key:{}'.format(
            username, hashlib.md5(key.public_data[:-16]).hexdigest()))
        
        for i in self.auth_client_pub_keys:
            if i == protocol_binary_to_authorized_key_rsa(key.public_data):
                print(username, '==> pub key matched, login success!!!')
                return True
        
        print(username, '--> pub key mis-matched')
        return False

async def start_server() -> None:
    await asyncssh.create_server(
        server_factory=demo_ssh_server,
        host='127.0.0.1',
        port=18822,
        server_version='DEMO-TEST-AsyncSSH_2.13.1',
        server_host_keys=['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
    )
    await asyncio.Event().wait()

if __name__ == '__main__':
    try:
        print(asyncssh.__version__)
        asyncio.run(start_server())
    except (OSError, asyncssh.Error) as exc:
        sys.exit('Error starting server: ' + str(exc))

其中,ssh 服务器在接收到客户端发送过来的公钥时,需要对公钥数据做一些格式的转化。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def protocol_binary_to_authorized_key_rsa(data: bytes) -> bytes:
    # 使用 Paramiko 将二进制数据转换为公钥对象
    paramiko_key = paramiko.RSAKey(data = data)
    # 将公钥对象转换为 OpenSSH 格式的文本公钥
    public_key_openssh = paramiko_key.get_base64()
    # 拼接密钥类型(如 "ssh-rsa")和 Base64 编码的公钥数据
    openssh_public_key = b"ssh-rsa " + public_key_openssh.encode("utf-8")
    # print(openssh_public_key)
    return openssh_public_key


def authorized_key_to_protocol_binary(data: bytes) -> bytes:
    # 从 OpenSSH 格式文本公钥中提取 Base64 编码的部分
    key_base64 = data.decode().split(" ", 2)
    key_data = base64.b64decode(key_base64[1])
    # 使用 Paramiko 将二进制数据转换为公钥对象
    paramiko_key = paramiko.RSAKey(data = key_data)
    # 获取二进制 SSH RSA 公钥数据
    binary_ssh_rsa_public_key = paramiko_key.asbytes()
    # print(binary_ssh_rsa_public_key)
    return binary_ssh_rsa_public_key

这里的转化效果可以通过以下的例子来测试:

1
2
3
4
5
6
7
8
9
# 二进制格式的 SSH RSA 公钥,其中包含了公钥的类型、长度和实际数据。这种格式在 SSH 协议中用于传输公钥。
ssh_protocol_binary_rsa = b'\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x03\x01\x00\x01\x00\x00\x01\x01\x00\xb4{+(\xf4\xb0\xa6\xc7\xbb\xac\xe8^u\xc2BR\xe4\x89,\\t\xf7t\x87\x91\x1a\xbf#$\x87\xd3MjA+\xb1Z\xee^\xa3\x0f\xcc\t\x05\xb8\xddt\x1ayt\xcdk\x8eZ\xaa\x1a9\x9fzZ\xa2\x05\xe1\x84\t\xbc=\x9d"\x98\x87\xd5\x80C\xd7\x88\x183\x90:\x85\x0e\xafc*\xabI\xb1\xc8\xf0\x9c~\xbe\xc2\xd8\xb1a\x11\xe5\xbeh\x9e\xa8\x9cU\xf9\xbd\x88\x86\x14\xbd\x97\xf9\xb0\xf7n\xb5\xd2Z>#nGs\x8f\x0bL\xc7J\xcd\x1b\xcft\xeaJ\xd9\x98\xcc\x8969\x16\x7f\x10\xbf\x06\x07\x9b\xb6jbUB\x87j8P\x1cc\x11*I\xb8~\x95\xc7\xa1\x0c\xc7\x1fM\x18*[\xabv\x91\txB\r\x1a9\xe8G\x10\'A\xf7\x0c"\xd8g\xe06\xe4\xf6WFvf\xe5\x8ayz8\x06\xf6q\xa5`\x00O\x86\\\x18\xd5v$\xf8\xacz\xfc^S\xa9V8b\xa7\xb8\xa4{\xbd\xf2\xd0I77\xde\x81\x98\xa9\xd6NB\xf9\x13J\x1b\xe5\x08\xfb\xcf\xdb\x19'

# OpenSSH 格式的文本数据,以 "ssh-rsa" 开头,后面跟着 Base64 编码的 RSA 公钥。这种格式通常用于 authorized_keys 文件。
ssh_authorized_keys_rsa = b'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC0eyso9LCmx7us6F51wkJS5IksXHT3dIeRGr8jJIfTTWpBK7Fa7l6jD8wJBbjddBp5dM1rjlqqGjmfelqiBeGECbw9nSKYh9WAQ9eIGDOQOoUOr2Mqq0mxyPCcfr7C2LFhEeW+aJ6onFX5vYiGFL2X+bD3brXSWj4jbkdzjwtMx0rNG8906krZmMyJNjkWfxC/BgebtmpiVUKHajhQHGMRKkm4fpXHoQzHH00YKlurdpEJeEINGjnoRxAnQfcMIthn4Dbk9ldGdmblinl6OAb2caVgAE+GXBjVdiT4rHr8XlOpVjhip7ike73y0Ek3N96BmKnWTkL5E0ob5Qj7z9sZ'

if __name__ == '__main__':
    assert (ssh_authorized_keys_rsa == protocol_binary_to_authorized_key_rsa(ssh_protocol_binary_rsa))
    assert (ssh_protocol_binary_rsa == authorized_key_to_protocol_binary(ssh_authorized_keys_rsa))

如果程序可以正常执行完成,则说明数据格式转化成功。

用户连接协议的定制化

在使用 ssh 的过程中,最常见的场景是用户通过 ssh 客户端登录进 ssh 服务器后,打开一个 shell 交互的连接。在用户连接层,我们可以自定义 ssh 认证完成后的逻辑,比如服务端给客户端返回一个字符 UI 界面。

本文基于 asyncssh 和 prompt_toolkit 进行这种效果的 demo 实现。

img

img

img

源码如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
import asyncio
import platform
import sys
import time
from typing import Dict, List, Optional

import asyncssh
from asyncssh.misc import MaybeAwait
from prompt_toolkit.contrib.ssh import PromptToolkitSSHServer, PromptToolkitSSHSession
from prompt_toolkit.shortcuts import print_formatted_text, ProgressBar
from prompt_toolkit.shortcuts.dialogs import input_dialog, yes_no_dialog
from prompt_toolkit.shortcuts.prompt import PromptSession
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.lexers import PygmentsLexer
from pygments.lexers.html import HtmlLexer

def sys_info() -> Dict[str, str]:
    return {
        'PythonVersion': sys.version,
        'ApiVersion': sys.api_version,
        'OSPlatform': sys.platform,
        'OSProcessor': platform.platform(),
        'BytesEndian': sys.byteorder
    }

class ssh_funny_tool(PromptToolkitSSHServer):
    def __init__(self, interact):
        super().__init__(interact)
        self.server_info = sys_info()
        self.connected_time: str = ''

    def connection_made(self, conn: asyncssh.SSHServerConnection) -> None:
        self.connected_time = time.strftime('%Y-%m-%d %H:%M:%S %p %A %B')

    def connection_lost(self, exc: Optional[Exception]) -> None:
        if exc:
            print('SSH connection error: ' + str(exc), file=sys.stderr)
        else:
            print('SSH connection closed.')

    def begin_auth(self, username: str) -> MaybeAwait[bool]:
        print(username, self.connected_time)
        return False  # 不需要认证

animal_completer = WordCompleter([
    "alligator", "ant", "ape", "bat", "bear", "beaver", "bee", "bison", 
    "butterfly", "cat", "chicken", "crocodile", "dinosaur", "dog", "dolphin",
    "dove", "duck", "eagle", "elephant", "fish", "goat", "gorilla", "kangaroo",
    "leopard", "lion", "mouse", "rabbit", "rat", "snake", "spider", "turkey", 
    "turtle",
], ignore_case=True)

async def interact(ssh_session: PromptToolkitSSHSession) -> None:
    """The application interaction."""
    prompt_session = PromptSession()
    print = print_formatted_text
    
    print("We will be running a few prompt_toolkit applications through this ")
    print("SSH connection.\n")
    
    # Simple progress bar
    with ProgressBar() as pb:
        for i in pb(range(50)):
            await asyncio.sleep(0.1)
    
    # Normal prompt
    text = await prompt_session.prompt_async("(normal prompt) Type something: ")
    print("You typed", text)
    
    # Prompt with auto completion
    text = await prompt_session.prompt_async(
        "(autocompletion) Type an animal: ", 
        completer=animal_completer
    )
    print("You typed", text)
    
    # prompt with syntax highlighting
    text = await prompt_session.prompt_async(
        "(HTML syntax highlighting) Type something: ",
        lexer=PygmentsLexer(HtmlLexer)
    )
    print("You typed", text)
    
    # Show yes/no dialog
    await prompt_session.prompt_async("Showing yes/no dialog... [ENTER]")
    user_choice = await yes_no_dialog("Yes/no dialog", "Running over asyncssh").run_async()
    print('user choice:{}'.format(user_choice))
    
    # Show input dialog
    await prompt_session.prompt_async("Showing input dialog... [ENTER]")
    user_input = await input_dialog("Input dialog", "Running over asyncssh").run_async()
    print('user input:{}'.format(user_input))

async def start_server() -> None:
    await asyncssh.create_server(
        server_factory=lambda: ssh_funny_tool(interact),
        host='127.0.0.1',
        port=18822,
        server_version='DEMO-TEST-AsyncSSH_2.13.1',
        server_host_keys=['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
    )
    await asyncio.Event().wait()

if __name__ == '__main__':
    try:
        print(asyncssh.__version__)
        asyncio.run(start_server())
    except (OSError, asyncssh.Error) as exc:
        sys.exit('Error starting server: ' + str(exc))