1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
| import asyncio
import json
import sys
import time
from typing import Any, Dict, Optional
import asyncssh
from asyncssh.misc import MaybeAwait, PasswordChangeRequired, PermissionDenied
class demo_ssh_server(asyncssh.SSHServer):
def __init__(self):
self.AUTH_MAX_RETRIES: int = 2
self.CHANGE_MAX_RETRIES: int = 2
self.password_retried: int = 0
self.update_password_retried: int = 0
self.connected_time = ''
self.auth_method = 'none'
self.auth_username = ''
self.auth_password = ''
self.auth_new_password = ''
self.auth_success: bool = False
def connection_made(self, conn: asyncssh.SSHServerConnection) -> None:
self.connected_time = time.strftime('%Y-%m-%d %H:%M:%S %p %A %B')
def session_requested(self):
return demo_ssh_session(self)
def connection_lost(self, exc: Optional[Exception]) -> None:
if exc:
print('SSH connection error: ' + str(exc), file=sys.stderr)
else:
print('SSH connection closed.')
def begin_auth(self, username: str) -> bool:
print('begin_auth, username:{}'.format(username))
return True # 需要认证
def password_auth_supported(self) -> bool:
return True # 开启密码认证
def validate_password(self, username: str, password: str) -> MaybeAwait[bool]:
self.auth_method = 'password'
self.auth_username = username
self.auth_password = password
print('username:{}, password:{}, auth password retried:{}, MAX LIMIT:{}'.format(
username, password, self.update_password_retried, self.AUTH_MAX_RETRIES))
# 如果密码重试次数已经超过了最大限度,则返回失败
if self.password_retried > self.AUTH_MAX_RETRIES or self.update_password_retried > self.CHANGE_MAX_RETRIES:
raise PermissionDenied(reason='demo ssh server, password auth failed, user:[{}]'.format(username))
# 用户名不为 test 和change 时,认为校验失败
if username != 'test' and username != 'change':
self.password_retried += 1
return False
# 校验 username 和password 是否匹配,如果匹配则返回 True
# 当用户名为 test 时,密码必须为 test
if username == 'test' and password != 'test':
self.password_retried += 1
return False
# 当用户名为 change 时,要求客户端更改密码
if username == 'change' and len(password) > 0:
self.password_retried += 1
raise PasswordChangeRequired(
prompt='change your password for username:[{}]'.format(username))
self.auth_success = True
return True
def change_password(self, username: str, old_password: str, new_password: str) -> MaybeAwait[bool]:
print('update password retried:{}, MAX LIMIT:{}'.format(
self.update_password_retried, self.CHANGE_MAX_RETRIES))
# 如果重置密码的次数超过上限则返回 False
if self.update_password_retried > self.CHANGE_MAX_RETRIES:
raise PermissionDenied(
reason='demo ssh server, update password failed, user:[{}]'.format(username))
# 如果老的密码和新的密码一致 或 新密码为空,则要求再次改变密码
if old_password == new_password or len(new_password) <= 0:
self.update_password_retried += 1
raise PasswordChangeRequired('retry change password for [{}]'.format(username))
self.auth_new_password = new_password
# 密码认证且更新了密码
self.auth_method = 'password and updated'
self.auth_success = True
return True
def auth_completed(self) -> None:
print('demo ssh server, auth_method:{}, '
'auth_username:{}, auth_password:{}, '
'auth_pwd_retried:{}, changed_pwd_retried:{}, '
'new_password:{}, auth_result:{}'.format(
self.auth_method, self.auth_username, self.auth_password,
self.password_retried, self.update_password_retried,
self.auth_new_password, self.auth_success))
pass
class demo_ssh_session(asyncssh.SSHServerSession):
def __init__(self, server: demo_ssh_server):
self.server: demo_ssh_server = server
def connection_made(self, channel):
super().connection_made(channel)
ret: Dict[str, Any] = {
'auth_method': self.server.auth_method,
'auth_username': self.server.auth_username,
'auth_password': self.server.auth_password,
'password_retried': self.server.password_retried,
'update_password_retried': self.server.update_password_retried,
'auth_new_password': self.server.auth_new_password,
'auth_succeed': self.server.auth_success,
}
data = json.dumps(ret, ensure_ascii=False, sort_keys=True)
# 当密码校验成功后,客户端与服务端建立了 session,此时回写登录信息给客户端
channel.write("{}\n".format(data))
channel.exit(0)
async def start_server() -> None:
await asyncssh.create_server(
server_factory=demo_ssh_server,
host='127.0.0.1',
port=18822,
server_version='DEMO-TEST-AsyncSSH_2.13.1', # 自定义服务器 SSH 协议版本名称
server_host_keys=['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
)
await asyncio.Event().wait() # 添加一个永远不会触发的事件,使服务器保持运行状态
if __name__ == '__main__':
try:
print(asyncssh.__version__)
asyncio.run(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit('Error starting server: ' + str(exc))
|