自签(RSA)证书的生成与使用

一、概述

本方案通过自签证书实现细粒度访问控制,结合非对称加密技术保障数据安全传输。核心功能包括:

  1. 生成带访问策略的自签证书
  2. 证书有效性验证与策略解析
  3. 基于证书公钥的敏感数据加密
  4. 使用私钥进行数据解密

二、证书生成流程

1. 密钥对生成

使用RSA算法生成2048位密钥对:

1
2
3
4
private_key = rsa.generate_private_key(
    public_exponent=65537,
    key_size=2048
)

密钥长度符合行业安全标准,可有效防御暴力破解

2. 策略嵌入

通过X.509证书扩展字段注入安全策略:

策略类型OID标识符数据类型示例值
资源访问白名单1.3.6.1.4.1.12345.1.1字符串列表“/api/v1”, “/data/export”
账户授权列表1.3.6.1.4.1.12345.1.2字符串列表“admin”, “audit_user”
IP访问规则1.3.6.1.4.1.12345.1.3JSON对象{“ip_list”:“192.168.1.1”, “cidr”:“10.0.0.0/8”}

3. 证书签名

采用SHA-512哈希算法进行自签名,有效期建议不超过90天:

1
cert = builder.sign(private_key, hashes.SHA512())

短期有效策略可降低证书泄露风险

三、证书验证机制

验证流程包含三重安全检查:

  1. 基础验证
  • 证书有效期检查(UTC时间戳比对)
  • 证书链完整性验证
  1. 签名验证

    public_key.verify(cert.signature, cert.tbs_certificate_bytes, ...)使用PKCS#1 v1.5填充方案验证签名真实性

  2. 策略解析

    自动提取扩展字段并转换为可读策略,异常策略会记录至validation_errors

四、数据安全分发

  • 支持最大加密长度:密钥长度/8 - 2*SHA256长度 - 2(例:2048位密钥支持190字节明文)
  • 采用OAEP填充方案,比传统PKCS#1更安全

代码示例

1
2
3
4
5
# 加密
cipher_data = cert.public_key().encrypt(data, padding.OAEP(...))

# 解密
decrypted = private_key.decrypt(cipher_data, padding.OAEP(...))

五、完整代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
from __future__ import annotations

import base64
import json
from datetime import datetime, timedelta, UTC
from typing import Tuple

from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509 import Certificate, Extension, ObjectIdentifier
from cryptography.x509.oid import NameOID

# 自定义OID定义(示例使用1.3.6.1.4.1.12345命名空间)
ALLOWED_RESOURCES_OID = ObjectIdentifier("1.3.6.1.4.1.12345.1.1")
ALLOWED_ACCOUNTS_OID = ObjectIdentifier("1.3.6.1.4.1.12345.1.2")
SOURCE_IP_RULES_OID = ObjectIdentifier("1.3.6.1.4.1.12345.1.3")


def generate_cert_with_policy(
    allowed_resources: list[str],
    allowed_accounts: list[str],
    ip_rules: dict,
    validity_days: int
) -> tuple[bytes, bytes]:
    """
    生成包含访问策略的自签名证书

    :param allowed_resources: 允许访问的资源路径列表
    :param allowed_accounts: 允许使用的账号列表
    :param ip_rules: IP规则字典,包含以下键:
        - "ip_list": 具体IP列表
        - "cidr": CIDR列表
        - "internal": 是否允许所有内网IP
        - "public": 是否允许所有公网IP
    :param validity_days: 证书有效天数
    :return: (证书PEM字节, 私钥PEM字节)
    """
    # 生成密钥对
    private_key = rsa.generate_private_key(
        public_exponent = 65537,
        key_size = 2048,
    )

    # 构建主题信息
    subject = issuer = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, "CN"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, "MyOrg"),
        x509.NameAttribute(NameOID.COMMON_NAME, "Access Control Certificate"),
    ])

    # 构建证书
    builder = (
        x509.CertificateBuilder()
        .subject_name(subject)
        .issuer_name(issuer)
        .public_key(private_key.public_key())
        .serial_number(x509.random_serial_number())
        .not_valid_before(datetime.now(UTC))
        .not_valid_after(datetime.now(UTC) + timedelta(days = validity_days))
    )

    # 添加自定义扩展
    extensions = [
        # 允许访问的资源
        Extension(
            ALLOWED_RESOURCES_OID,
            critical = True,  # 表明扩展包含关键安全策略,否则是辅助拓展信息
            value = x509.UnrecognizedExtension(
                ALLOWED_RESOURCES_OID,
                ",".join(allowed_resources).encode()
            )
        ),
        # 允许使用的账号
        Extension(
            ALLOWED_ACCOUNTS_OID,
            critical = True,  # 表明扩展包含关键安全策略,否则是辅助拓展信息
            value = x509.UnrecognizedExtension(
                ALLOWED_ACCOUNTS_OID,
                ",".join(allowed_accounts).encode()
            )
        ),
        # IP访问规则
        Extension(
            SOURCE_IP_RULES_OID,
            critical = True,  # 表明扩展包含关键安全策略,否则是辅助拓展信息
            value = x509.UnrecognizedExtension(
                SOURCE_IP_RULES_OID,
                json.dumps(ip_rules).encode()
            )
        )
    ]

    for ext in extensions:
        builder = builder.add_extension(ext.value, critical = ext.critical)

    # 签名证书
    cert = builder.sign(private_key, hashes.SHA512())

    # 序列化输出
    return (
        cert.public_bytes(serialization.Encoding.PEM),
        private_key.private_bytes(
            encoding = serialization.Encoding.PEM,
            format = serialization.PrivateFormat.PKCS8,
            encryption_algorithm = serialization.NoEncryption()
        )
    )


def full_verify_certificate(cert_pem: bytes) -> Tuple[dict, Certificate]:
    """
    完整证书验证(包含内容解析和签名验证)

    返回结构:
    {
        "basic_valid": bool,          # 基础验证是否通过
        "signature_valid": bool,      # 签名验证结果
        "policy_data": dict,          # 策略数据
        "validation_errors": list,    # 验证错误信息
        "cert_info": dict             # 证书基础信息
    }
    """
    cert_parse_result = {
        "basic_valid": True,
        "signature_valid": False,
        "policy_data": {},
        "validation_errors": [],
        "cert_info": {}
    }

    try:
        # 加载证书
        cert = x509.load_pem_x509_certificate(cert_pem)

        # 记录基础信息
        cert_parse_result["cert_info"].update({
            "subject": cert.subject.rfc4514_string(),
            "issuer": cert.issuer.rfc4514_string(),
            "serial_number": cert.serial_number,
            "valid_from": cert.not_valid_before_utc.astimezone().isoformat(),
            "valid_to": cert.not_valid_after_utc.astimezone().isoformat(),
            "is_self_signed": cert.issuer == cert.subject
        })

        # 基础有效性验证
        current_time = datetime.now(UTC)
        if current_time < cert.not_valid_before_utc:
            cert_parse_result["validation_errors"].append("证书尚未生效")
            cert_parse_result["basic_valid"] = False
        if current_time > cert.not_valid_after_utc:
            cert_parse_result["validation_errors"].append("证书已过期")
            cert_parse_result["basic_valid"] = False

        # 签名验证(自签名验证)
        public_key = cert.public_key()
        try:
            # 验证签名算法与证书一致
            public_key.verify(
                cert.signature,
                cert.tbs_certificate_bytes,
                padding.PKCS1v15(),
                cert.signature_hash_algorithm
            )
            cert_parse_result["signature_valid"] = True
        except InvalidSignature:
            cert_parse_result["validation_errors"].append("证书签名无效")
            cert_parse_result["signature_valid"] = False
        except TypeError as e:
            cert_parse_result["validation_errors"].append(f"签名算法不匹配: {str(e)}")
            cert_parse_result["signature_valid"] = False

        # 策略扩展解析
        policy_data = {}
        try:
            # 资源白名单解析
            res_ext = cert.extensions.get_extension_for_oid(
                x509.ObjectIdentifier(ALLOWED_RESOURCES_OID.dotted_string)
            )
            policy_data["allowed_resources"] = res_ext.value.public_bytes().decode().split(",")
        except x509.ExtensionNotFound:
            cert_parse_result["validation_errors"].append("缺少资源白名单扩展")

        try:
            # 账户白名单解析
            acc_ext = cert.extensions.get_extension_for_oid(
                x509.ObjectIdentifier(ALLOWED_ACCOUNTS_OID.dotted_string)
            )
            policy_data["allowed_accounts"] = acc_ext.value.public_bytes().decode().split(",")
        except x509.ExtensionNotFound:
            cert_parse_result["validation_errors"].append("缺少账户白名单扩展")

        try:
            # IP规则解析
            ip_ext = cert.extensions.get_extension_for_oid(
                x509.ObjectIdentifier(SOURCE_IP_RULES_OID.dotted_string)
            )
            policy_data.update(json.loads(ip_ext.value.public_bytes().decode()))
        except x509.ExtensionNotFound:
            cert_parse_result["validation_errors"].append("缺少IP规则扩展")
        except json.JSONDecodeError:
            cert_parse_result["validation_errors"].append("IP规则格式错误")

        cert_parse_result["policy_data"] = policy_data

    except ValueError as e:
        cert_parse_result["validation_errors"].append(f"证书格式错误: {str(e)}")
        cert_parse_result["basic_valid"] = False
        return cert_parse_result, Certificate()
    else:
        return cert_parse_result, cert


def encrypt_by_public_key(encrypt_cert: Certificate, data: bytes) -> bytes:
    """
    使用公钥加密数据
    最大明文长度 = 密钥模长(字节) - 2 × 哈希函数输出长度(字节) - 2

    返回结构:
    bytes
    """
    max_plaintext_length = encrypt_cert.public_key().key_size // 8 - 2 * hashes.SHA256().digest_size - 2
    if len(data) > max_plaintext_length:
        raise ValueError(f"明文长度超出限制:当前长度 {len(data)},最大允许 {max_plaintext_length} 字节")
    return encrypt_cert.public_key().encrypt(
        data,
        padding.OAEP(
            mgf = padding.MGF1(algorithm = hashes.SHA256()),
            algorithm = hashes.SHA256(),
            label = None
        ))


def decrypt_by_private_key(decrypt_private: bytes, data: bytes) -> bytes:
    """
    使用私钥解密数据

    返回结构:
    bytes
    """
    rsa_private_key = serialization.load_pem_private_key(
        decrypt_private,
        password = None  # 无加密时设为 None
    )
    return rsa_private_key.decrypt(
        data,
        padding.OAEP(
            mgf = padding.MGF1(algorithm = hashes.SHA256()),
            algorithm = hashes.SHA256(),
            label = None
        )
    )


if __name__ == "__main__":
    # 证书生成
    # 生成证书的拓展策略信息
    # 策略信息包括:此证书允许访问的资源、允许使用此证书的账户、允许访问的IP地址信息
    cert_bytes, private_key_bytes = generate_cert_with_policy(
        allowed_resources = ["12345", "67890"],
        allowed_accounts = ["12345", "67890"],
        ip_rules = {
            "ip_list": ["127.0.0.1"],
            "cidr": ["192.168.0.0/16"],
            "internal": True,
            "public": True
        },
        validity_days = 30
    )

    # 证书验证: 验证证书的有效性、完整性、签名、证书拓展的策略信息
    result, cert = full_verify_certificate(cert_bytes)
    # print(f"验证结果:{json.dumps(result, indent = 4, ensure_ascii = False)}")
    print(f"证书验证成功:{result['basic_valid']}")
    print(f"签名验证成功:{result['signature_valid']}")
    print(f"密钥长度:{cert.public_key().key_size // 8}")

    # 使用证书加密敏感数据
    plain_data = {"key": "test_1234567890", "value": "test_1234567890"}
    sensitive_data = json.dumps(plain_data).encode()
    cipher_data = encrypt_by_public_key(cert, sensitive_data)
    print(f"加密前的数据:{base64.b64encode(sensitive_data).decode()}, 长度:{len(sensitive_data)} "
          f"加密后的数据:{base64.b64encode(cipher_data).decode()}, 长度:{len(cipher_data)}")

    # 使用私钥解密敏感数据
    decrypted_data = decrypt_by_private_key(private_key_bytes, cipher_data)
    print(f"解密后的数据:{base64.b64encode(decrypted_data).decode()}, 长度:{len(decrypted_data)}")

    assert decrypted_data == sensitive_data, "使用私钥解密失败"
    print(f"解密成功:{decrypted_data == sensitive_data}")

img