在最近的折腾中,发现某个 Python 程序运行,服务了一段时间以后老是占用很多很多的内存。

然后用了 这里 的方法去调查,发现是某个对象没有被释放,在内存中越积累越多。

当然,最后这问题得以解决了,在此将这个问题进行记录,也提醒自己以后不要犯这种问题。

来看其中这样的一组代码(将无关部分暂时隐去,想看完整的可以看这里):

cryptor.py:

#!/usr/bin/env python
#
# Copyright 2012-2015 clowwindy
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import os
import sys
import hashlib
import logging

from shadowsocks import common
from shadowsocks.crypto import rc4_md5, openssl, mbedtls, sodium, table


CIPHER_ENC_ENCRYPTION = 1
CIPHER_ENC_DECRYPTION = 0

METHOD_INFO_KEY_LEN = 0
METHOD_INFO_IV_LEN = 1
METHOD_INFO_CRYPTO = 2

method_supported = {}
method_supported.update(rc4_md5.ciphers)
method_supported.update(openssl.ciphers)
method_supported.update(mbedtls.ciphers)
method_supported.update(sodium.ciphers)
method_supported.update(table.ciphers)

class Cryptor(object):
    def __init__(self, password, method, crypto_path=None):
        """
        Crypto wrapper
        :param password: str cipher password
        :param method: str cipher
        :param crypto_path: dict or none
            {'openssl': path, 'sodium': path, 'mbedtls': path}
        """
        self.password = password
        self.key = None
        self.method = method
        self.iv_sent = False
        self.cipher_iv = b''
        self.decipher = None
        self.decipher_iv = None
        self.crypto_path = crypto_path
        method = method.lower()
        self._method_info = Cryptor.get_method_info(method)
        if self._method_info:
            self.cipher = self.get_cipher(
                password, method, CIPHER_ENC_ENCRYPTION,
                random_string(self._method_info[METHOD_INFO_IV_LEN])
            )
        else:
            logging.error('method %s not supported' % method)
            sys.exit(1)

    @staticmethod
    def get_method_info(method):
        method = method.lower()
        m = method_supported.get(method)
        return m

    def iv_len(self):
        return len(self.cipher_iv)

    def get_cipher(self, password, method, op, iv):
        password = common.to_bytes(password)
        m = self._method_info
        if m[METHOD_INFO_KEY_LEN] > 0:
            key, _ = EVP_BytesToKey(password,
                                    m[METHOD_INFO_KEY_LEN],
                                    m[METHOD_INFO_IV_LEN])
        else:
            # key_length == 0 indicates we should use the key directly
            key, iv = password, b''
        self.key = key
        iv = iv[:m[METHOD_INFO_IV_LEN]]
        if op == CIPHER_ENC_ENCRYPTION:
            # this iv is for cipher not decipher
            self.cipher_iv = iv
        return m[METHOD_INFO_CRYPTO](method, key, iv, op, self.crypto_path)

    def encrypt(self, buf):
        if len(buf) == 0:
            return buf
        if self.iv_sent:
            return self.cipher.encrypt(buf)
        else:
            self.iv_sent = True
            return self.cipher_iv + self.cipher.encrypt(buf)

    def decrypt(self, buf):
        if len(buf) == 0:
            return buf
        if self.decipher is None:
            decipher_iv_len = self._method_info[METHOD_INFO_IV_LEN]
            decipher_iv = buf[:decipher_iv_len]
            self.decipher_iv = decipher_iv
            self.decipher = self.get_cipher(
                self.password, self.method,
                CIPHER_ENC_DECRYPTION,
                decipher_iv
            )
            buf = buf[decipher_iv_len:]
            if len(buf) == 0:
                return buf
        return self.decipher.decrypt(buf)


def gen_key_iv(password, method):
    method = method.lower()
    (key_len, iv_len, m) = method_supported[method]
    if key_len > 0:
        key, _ = EVP_BytesToKey(password, key_len, iv_len)
    else:
        key = password
    iv = random_string(iv_len)
    return key, iv, m


def encrypt_all_m(key, iv, m, method, data, crypto_path=None):
    result = [iv]
    cipher = m(method, key, iv, 1, crypto_path)
    result.append(cipher.encrypt_once(data))
    return b''.join(result)


def decrypt_all(password, method, data, crypto_path=None):
    result = []
    method = method.lower()
    (key, iv, m) = gen_key_iv(password, method)
    iv = data[:len(iv)]
    data = data[len(iv):]
    cipher = m(method, key, iv, CIPHER_ENC_DECRYPTION, crypto_path)
    result.append(cipher.decrypt_once(data))
    return b''.join(result), key, iv


def encrypt_all(password, method, data, crypto_path=None):
    result = []
    method = method.lower()
    (key, iv, m) = gen_key_iv(password, method)
    result.append(iv)
    cipher = m(method, key, iv, CIPHER_ENC_ENCRYPTION, crypto_path)
    result.append(cipher.encrypt_once(data))
    return b''.join(result)

这个的目的主要就是对于发送来的数据进行加解密,使用的时候将这个类实例化,或者直接调用最后的三个方法来加解密数据。

然后是这个,我们拿泄露的那个对象所在的 openssl 来说

class OpenSSLCryptoBase(object):
    """
    OpenSSL crypto base class
    """
    def __init__(self, cipher_name, crypto_path=None):
        self._ctx = None
        self._cipher = None
        if not loaded:
            load_openssl(crypto_path)
        cipher_name = common.to_bytes(cipher_name)
        cipher = libcrypto.EVP_get_cipherbyname(cipher_name)
        if not cipher:
            cipher = load_cipher(cipher_name)
        if not cipher:
            raise Exception('cipher %s not found in libcrypto' % cipher_name)
        self._ctx = libcrypto.EVP_CIPHER_CTX_new()
        self._cipher = cipher
        if not self._ctx:
            raise Exception('can not create cipher context')

        self.encrypt_once = self.update
        self.decrypt_once = self.update

    def update(self, data):
        """
        Encrypt/decrypt data
        :param data: str
        :return: str
        """
        global buf_size, buf
        cipher_out_len = c_long(0)
        l = len(data)
        if buf_size < l:
            buf_size = l * 2
            buf = create_string_buffer(buf_size)
        libcrypto.EVP_CipherUpdate(
            self._ctx, byref(buf),
            byref(cipher_out_len), c_char_p(data), l
        )
        # buf is copied to a str object when we access buf.raw
        return buf.raw[:cipher_out_len.value]

    def __del__(self):
        self.clean()

    def clean(self):
        if self._ctx:
            ctx_cleanup(self._ctx)
            libcrypto.EVP_CIPHER_CTX_free(self._ctx)


class OpenSSLAeadCrypto(OpenSSLCryptoBase, AeadCryptoBase):
    """
    Implement OpenSSL Aead mode: gcm, ocb
    """
    def __init__(self, cipher_name, key, iv, op, crypto_path=None):
        OpenSSLCryptoBase.__init__(self, cipher_name, crypto_path)
        AeadCryptoBase.__init__(self, cipher_name, key, iv, op, crypto_path)

        key_ptr = c_char_p(self._skey)
        r = libcrypto.EVP_CipherInit_ex(
            self._ctx,
            self._cipher,
            None,
            key_ptr, None,
            c_int(op)
        )
        if not r:
            self.clean()
            raise Exception('can not initialize cipher context')

        r = libcrypto.EVP_CIPHER_CTX_ctrl(
            self._ctx,
            c_int(EVP_CTRL_AEAD_SET_IVLEN),
            c_int(self._nlen),
            None
        )
        if not r:
            self.clean()
            raise Exception('Set ivlen failed')

        self.cipher_ctx_init()

    def cipher_ctx_init(self):
        """
        Need init cipher context after EVP_CipherFinal_ex to reuse context
        :return: None
        """
        iv_ptr = c_char_p(self._nonce.raw)
        r = libcrypto.EVP_CipherInit_ex(
            self._ctx,
            None,
            None,
            None, iv_ptr,
            c_int(CIPHER_ENC_UNCHANGED)
        )
        if not r:
            self.clean()
            raise Exception('can not initialize cipher context')

        AeadCryptoBase.nonce_increment(self)

    def set_tag(self, tag):
        """
        Set tag before decrypt any data (update)
        :param tag: authenticated tag
        :return: None
        """
        tag_len = self._tlen
        r = libcrypto.EVP_CIPHER_CTX_ctrl(
            self._ctx,
            c_int(EVP_CTRL_AEAD_SET_TAG),
            c_int(tag_len), c_char_p(tag)
        )
        if not r:
            self.clean()
            raise Exception('Set tag failed')

    def get_tag(self):
        """
        Get authenticated tag, called after EVP_CipherFinal_ex
        :return: str
        """
        tag_len = self._tlen
        tag_buf = create_string_buffer(tag_len)
        r = libcrypto.EVP_CIPHER_CTX_ctrl(
            self._ctx,
            c_int(EVP_CTRL_AEAD_GET_TAG),
            c_int(tag_len), byref(tag_buf)
        )
        if not r:
            self.clean()
            raise Exception('Get tag failed')
        return tag_buf.raw[:tag_len]

    def final(self):
        """
        Finish encrypt/decrypt a chunk (<= 0x3FFF)
        :return: str
        """
        global buf_size, buf
        cipher_out_len = c_long(0)
        r = libcrypto.EVP_CipherFinal_ex(
            self._ctx,
            byref(buf), byref(cipher_out_len)
        )
        if not r:
            self.clean()
            # print(self._nonce.raw, r, cipher_out_len)
            raise Exception('Finalize cipher failed')
        return buf.raw[:cipher_out_len.value]

    def aead_encrypt(self, data):
        """
        Encrypt data with authenticate tag

        :param data: plain text
        :return: cipher text with tag
        """
        ctext = self.update(data) + self.final() + self.get_tag()
        self.cipher_ctx_init()
        return ctext

    def aead_decrypt(self, data):
        """
        Decrypt data and authenticate tag

        :param data: cipher text with tag
        :return: plain text
        """
        clen = len(data)
        if clen < self._tlen:
            self.clean()
            raise Exception('Data too short')

        self.set_tag(data[clen - self._tlen:])
        plaintext = self.update(data[:clen - self._tlen]) + self.final()
        self.cipher_ctx_init()
        return plaintext


class OpenSSLStreamCrypto(OpenSSLCryptoBase):
    """
    Crypto for stream modes: cfb, ofb, ctr
    """
    def __init__(self, cipher_name, key, iv, op, crypto_path=None):
        OpenSSLCryptoBase.__init__(self, cipher_name, crypto_path)
        key_ptr = c_char_p(key)
        iv_ptr = c_char_p(iv)
        r = libcrypto.EVP_CipherInit_ex(self._ctx, self._cipher, None,
                                        key_ptr, iv_ptr, c_int(op))
        if not r:
            self.clean()
            raise Exception('can not initialize cipher context')
        self.encrypt = self.update
        self.decrypt = self.update

ciphers = {
    'aes-128-cfb': (16, 16, OpenSSLStreamCrypto),
    'aes-192-cfb': (24, 16, OpenSSLStreamCrypto),
    'aes-256-cfb': (32, 16, OpenSSLStreamCrypto),
    'aes-128-ofb': (16, 16, OpenSSLStreamCrypto),
    'aes-192-ofb': (24, 16, OpenSSLStreamCrypto),
    'aes-256-ofb': (32, 16, OpenSSLStreamCrypto),
    'aes-128-ctr': (16, 16, OpenSSLStreamCrypto),
    'aes-192-ctr': (24, 16, OpenSSLStreamCrypto),
    'aes-256-ctr': (32, 16, OpenSSLStreamCrypto),
    'aes-128-cfb8': (16, 16, OpenSSLStreamCrypto),
    'aes-192-cfb8': (24, 16, OpenSSLStreamCrypto),
    'aes-256-cfb8': (32, 16, OpenSSLStreamCrypto),
    'aes-128-cfb1': (16, 16, OpenSSLStreamCrypto),
    'aes-192-cfb1': (24, 16, OpenSSLStreamCrypto),
    'aes-256-cfb1': (32, 16, OpenSSLStreamCrypto),
    'bf-cfb': (16, 8, OpenSSLStreamCrypto),
    'camellia-128-cfb': (16, 16, OpenSSLStreamCrypto),
    'camellia-192-cfb': (24, 16, OpenSSLStreamCrypto),
    'camellia-256-cfb': (32, 16, OpenSSLStreamCrypto),
    'cast5-cfb': (16, 8, OpenSSLStreamCrypto),
    'des-cfb': (8, 8, OpenSSLStreamCrypto),
    'idea-cfb': (16, 8, OpenSSLStreamCrypto),
    'rc2-cfb': (16, 8, OpenSSLStreamCrypto),
    'rc4': (16, 0, OpenSSLStreamCrypto),
    'seed-cfb': (16, 16, OpenSSLStreamCrypto),
    # AEAD: iv_len = salt_len = key_len
    'aes-128-gcm': (16, 16, OpenSSLAeadCrypto),
    'aes-192-gcm': (24, 24, OpenSSLAeadCrypto),
    'aes-256-gcm': (32, 32, OpenSSLAeadCrypto),
    'aes-128-ocb': (16, 16, OpenSSLAeadCrypto),
    'aes-192-ocb': (24, 24, OpenSSLAeadCrypto),
    'aes-256-ocb': (32, 32, OpenSSLAeadCrypto),
}

可以看到,分为三个类,OpenSSLStreamCrypto 流加密类,OpenSSLAeadCrypto AEAD加密类,OpenSSLCryptoBase 基础加密类,前二者继承后者,在这上面重写方法来实现各自所需要的实现的特性。

总结来看,就和个链条一样, encrypt  里有个 Encrypt 类,这个类下调用 crypto 文件夹里各种加密库的类,将其包装好以便使用。

而就是前面这两个对象存在内存泄漏问题,没有被 Python 的垃圾回收机制给回收掉。

当然我们从这个项目的 commit 里也可以看到,之前的维护者做了很多努力,想解决这个问题,比如在抛出异常的时候调用 clean,将对象回收掉,但就下面的反馈来看似乎作用不太大,算是治标不治本。

Python 是判断对象还有没有在被调用来判断这个对象该不该回收的,所以就得从这个方面入手,看看有哪些该标记为没有被调用的对象还在被调用。

首先我想到的就是重写 encrypt 里的 _del_ 方法,这个是 Python 销毁对象的时候系统内部调用的方法。我就尝试重写了 _del_ ,在其中调用 crypto 里对应加密库类的 clean 方法。然后测试运行发现,内存泄露的速度确实减缓了一些,但也还是没有真正的解决问题。

再后来仔细观察,发现 主要就是 OpenSSL 流加密的对象泄露得最快,而 AEAD 的泄露又似乎少了一些。而对于其他的加密库,似乎并没有内存泄露。

再尝试用 objgraph 看了看(其实一开始就应该用这个看一看的),发现是 self.update 这个方法一直被调用着,导致对象无法被释放。

再仔细看看代码,发现为了使代码优雅一些,对于加密的调用分为四个方法 encrypt, decrypt, encrypt_once, decrypt_once 。Encrypt 类分别调用这四个方法,来实现不同的加密操作。

对于流加密来说,encrypt, decrypt, encrypt_once, decrypt_once 其实都是指向所调用的加密库类的 update 方法,在代码中是以下面的形式调用的(省略掉无关的代码了)。

class OpenSSLCryptoBase(object):
    def __init__(self, cipher_name, crypto_path=None):
        self.encrypt_once = self.update
        self.decrypt_once = self.update

class OpenSSLStreamCrypto(OpenSSLCryptoBase):
    def __init__(self, cipher_name, key, iv, op, crypto_path=None):
        self.encrypt = self.update
        self.decrypt = self.update

而对于 AEAD 来说,则是下面这样

class OpenSSLCryptoBase(object):
    def __init__(self, cipher_name, crypto_path=None):
        self.encrypt_once = self.update
        self.decrypt_once = self.update

class OpenSSLAeadCrypto(OpenSSLCryptoBase, AeadCryptoBase):
    def __init__(self, cipher_name, key, iv, op, crypto_path=None):
    ......

在 aead.py 的 AeadCryptoBase 类里:

class AeadCryptoBase(object):

    def __init__(self, cipher_name, key, iv, op, crypto_path=None):

        self.encrypt_once = self.aead_encrypt
        self.decrypt_once = self.aead_decrypt

存在这样的引用,组成了 AEAD 的加密方法的引用。

然后我就怀疑,是不是这种引用上存在一些坑,导致了对象在使用完毕后无法被正确地被系统识别和回收。

尝试将这些引用进行改写,比如

self.encrypt_once = self.update

将其改写为

    def encrypt_once(self, data):
        return self.update(data)

原先是 Encrypt 类调用 加密库类里“包装”好了的加密方法(encrypt, decrypt, encrypt_once, decrypt_once),加密库对外方法根据情况直接引用自身的方法引用方法,比如 在 流加密里 上面这四个函数都是指向自身的 update 方法,AEAD 里则是有两个是自身所拥有的方法,而另外两个则是指向自身的 update 方法。

而改写之后,就不是直接指向了,由改写之后的方法作为中转,在其中再调用各自应该调用的方法。

这样改写之后,经过长时间的测试,就再也没有发现内存泄露了,内存长时间的占用都维持在一个低位水平了。

原因分析,只能就我个人的猜测来说,我觉得是原先的引用方式,是直接引用的,调用到的直接是最终的函数了,这样调用确实是方便,减少了一些代码量。但这样每次调用之后,就需要对每个对象进行彻底的标记和释放,如果不标记,这样链式引用会造成系统无法知道从何处回收起,从而也就一直不被回收了。当然- -如果不嫌麻烦- -而且心够细的话,能在使用之后进行精确的标记,能让系统回收,那也行。

而改写之后,就不存在链式引用了,外部调用的就相当于调用被调用对象的自身方法了,从而系统能在这个对象使用完之后进行正确的回收。

对于自己来说,在以后的开发中也要自己注意一下,尽量不要进行类似的链式引用。