Python学习之——序列化与反序列化

发布于:2025-07-18 ⋅ 阅读:(19) ⋅ 点赞:(0)

yaml & json & xml

YAML & JSON &XML 如何选择

yaml

Python的PyYAML模块详解

pip install pyyaml
import yaml

# 自定义 Loader 类
class SafePersonLoader(yaml.SafeLoader):
    pass

  
# 定义一个示例类
class Person:
    def __init__(self, name, age):
        self.name = name
        self.age = age

    def __repr__(self):
        return f"Person(name='{self.name}', age={self.age})"


# 自定义序列化方法
def person_representer(dumper, person):
    # 自定义标签!Person
    return dumper.represent_mapping('!Person', {'name': person.name, 'age': person.age})


# 自定义反序列化方法
def person_constructor(loader, node):
    fields = loader.construct_mapping(node, deep=True)
    return Person(**fields)


class YamlUtil:
    def __init__(self):
        # 添加Person类实例的序列化方法
        yaml.add_representer(Person, person_representer)
        # 注册自定义标签!Person对应的反序列化方法
        SafePersonLoader.add_constructor('!Person', person_constructor)

    def dump_file(self, data, file_path):
        with open(file_path, "w", encoding="utf-8") as fp:
            yaml.dump(data, fp, default_flow_style=False)

    def dump_str(self, data):
        return yaml.dump(data)

    def load_file(self, file_path):
        with open(file_path, "r", encoding="utf-8") as fp:
            return yaml.load(fp, Loader=SafePersonLoader)

    def load_str(self, data):
        return yaml.load(data)

    def dump_obj_str(self, obj):
        return yaml.dump(obj)

    def dump_obj_file(self, obj, file_path):
        with open(file_path, "w", encoding="utf-8") as fp:
            yaml.dump(obj, fp, default_flow_style=False)

    def load_obj_str(self, yaml_str):
        return yaml.load(yaml_str, Loader=SafePersonLoader)

    def load_obj_file(self, file_path):
        with open(file_path, "r", encoding="utf-8") as fp:
            return yaml.load(fp, Loader=SafePersonLoader)


if __name__ == "__main__":
    # 1.yaml的dump和load
    yaml_util = YamlUtil()
    test_data = {
        "name": "test",
        "age": 18,
        "ids": [1, 2, 3, [1, 2, 3, 4]],
    }
    yaml_util.dump_file(test_data, "./TestYaml.yaml")
    yaml_data = yaml_util.load_file("./TestYaml.yaml")
    print(yaml_data)

    # 自定义对象的dump和load
    person = Person("test", 18)
    yaml_str = yaml_util.dump_obj_str(person)
    print(yaml_str)
    yaml_util.dump_obj_file(person, "./TestYamlObj.yaml")
    person_new1 = yaml_util.load_obj_str(yaml_str)
    print(person_new1)
    person_new2 = yaml_util.load_obj_file("./TestYamlObj.yaml")
    print(person_new2)

json

一文看懂Python类型数据JSON序列化

python的dict和json数据有什么区别?

区别

  • Python的dict是一种数据结构,JSON是一种数据格式。
  • dict的key可以是任意可hash对象,json只能是字符串。{(1,2):1} 在python里是合法的,因为tuple是hashable type; {[1,2]:1} 在python里TypeError: unhashable “list”
  • 形式上有些相像,但json是纯文本的,无法直接操作。
  • dict字符串用单引号,json强制规定双引号。
  • dict里可以嵌套tuple, json里只有array。 json.dumps({1:2}) 的结果是 {“1”:2}, json.dumps((1,2)) 的结果是[1,2]
  • json: true|false|null ; dict:True|False|None

联系

  • dict 存在于内存中,可以被序列化成 json 格式的数据(string),之后这些数据就可以传输或者存储了。

Json数据类型和Python数据类型的对应关系如下:

  • Python 编码为 Json 类型转换对应表
Python Json
dict object
list, tuple array
str string
int, float, int- & float-derived Enums number
True true
False false
None null
  • Json 解码为 Python 类型转换对应表:
Json Python
object dict
array list
string str
number (int) int
number (real) float
true True
false False
null None

Python自带Json库

Python自带Json库用于序列化和反序列化,主要包含了dumps, loads, dump和load四种方法,其作用分别如下所示:

json.dump	将Python对象按照JSON格式序列化到文件中
json.dumps	将Python对象处理成JSON格式的字符串

json.load	将文件中的JSON数据反序列化成对象
json.loads	将字符串的内容反序列化成Python对象

很多python数据类型,如set, datetime,自定义的类等不能够直接通过dumps来序列化,可以通过如下两种方法解决

  1. 通过数据类型转换函数实现
  2. 通过继承JSONEncoder和JSONDecoder类实现

测试示例

# _*_ coding: UTF-8 _*_

import json
from datetime import datetime

# 定义一个示例类
class Person:
    def __init__(self, name, age):
        self.name = name
        self.age = age

    def __repr__(self):
        return f"Person(name='{self.name}', age={self.age})"


def custom_dump_func(obj):
    if isinstance(obj, set):
        return {
            "class_name": "set",
            "data": list(obj)
        }
    elif isinstance(obj, datetime):
        return {
            "class_name": "datetime",
            "data": obj.strftime("%Y/%m/%d %H:%M:%S")
        }
    elif isinstance(obj, Person):
        return {
            "class_name": "Person",
            "data": {
                "name": obj.name,
                "age": obj.age
            }
        }
    raise TypeError


def custom_load_func(dct):
    if "class_name" in dct:
        if dct["class_name"] == "set":
            return set(dct["data"])
        elif dct["class_name"] == "datetime":
            return datetime.strptime(dct["data"], "%Y/%m/%d %H:%M:%S")
        elif dct["class_name"] == "Person":
            return Person(dct["data"]["name"], dct["data"]["age"])
    return dct


class CustomEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, set):
            return {
                "class_name": "set",
                "data": list(obj)
            }
        elif isinstance(obj, datetime):
            return {
                "class_name": "datetime",
                "data": obj.strftime("%Y/%m/%d %H:%M:%S")
            }
        elif isinstance(obj, Person):
            return {
                "class_name": "Person",
                "data": {
                    "name": obj.name,
                    "age": obj.age
                }
            }
        return json.JSONEncoder.default(self, obj)


class CustomDecoder(json.JSONDecoder):
    def __init__(self, *, object_hook=None, parse_float=None,
                 parse_int=None, parse_constant=None, strict=True,
                 object_pairs_hook=None):
        super().__init__(object_hook=self.object_hook, parse_float=parse_float,
                         parse_int=parse_int, parse_constant=parse_constant, strict=strict,
                         object_pairs_hook=object_pairs_hook)

    def object_hook(self, o):
        if "class_name" in o:
            if o["class_name"] == "set":
                return set(o["data"])
            elif o["class_name"] == "datetime":
                return datetime.strptime(o["data"], "%Y/%m/%d %H:%M:%S")
            elif o["class_name"] == "Person":
                return Person(o["data"]["name"], o["data"]["age"])
        return o


class JsonUtil:

    def dump_file(self, data, file_path, dump_func=None, cls=None, indent=4):
        with open(file_path, "w", encoding="utf-8") as fp:
            json.dump(data, fp, default=dump_func, cls=cls, indent=indent)

    def dump_str(self, data, dump_func=None, cls=None, indent=4):
        return json.dumps(data, default=dump_func, cls=cls, indent=indent)

    def load_file(self, file_path, load_func=None, cls=None):
        with open(file_path, "r", encoding="utf-8") as fp:
            return json.load(fp, object_hook=load_func, cls=cls)

    def load_str(self, data, load_func=None, cls=None):
        return json.loads(data, object_hook=load_func, cls=cls)


if __name__ == "__main__":
    json_util = JsonUtil()
    test_dict = {
        "x": 1,
        "y": 2,
        "z": 3,
        "set": {"apple", "banana", "orange"},
        "datetime": datetime.now(),
        "person": Person("test", 18)
    }
    json_str1 = json_util.dump_str(test_dict, dump_func=custom_dump_func)
    print(json_str1)
    json_util.dump_file(test_dict, "./TestJson.json", dump_func=custom_dump_func)
    json_dict1 = json_util.load_str(json_str1, load_func=custom_load_func)
    print(json_dict1)
    json_dict2 = json_util.load_file("./TestJson.json", load_func=custom_load_func)
    print(json_dict2)

    json_str2 = json_util.dump_str(test_dict, cls=CustomEncoder)
    print(json_str2)
    json_util.dump_file(test_dict, "./TestJson2.json", cls=CustomEncoder)
    json_dict3 = json_util.load_str(json_str2, cls=CustomDecoder)
    print(json_dict3)
    json_dict4 = json_util.load_file("./TestJson2.json", cls=CustomDecoder)
    print(json_dict4)

xml

Python XML 解析

一个综合示例

import yaml
import json
# import xml
import xmltodict


def collect_subclasses(cls):
    """
    使基类能够自动收集所有子类。
    """
    cls.subclasses = {}

    def __init_subclass__(subclass, **kwargs):
        super(cls, subclass).__init_subclass__(**kwargs)
        cls.subclasses[subclass.__name__] = subclass

    def get_subclass(cls, subcls_name):
        return cls.subclasses.get(subcls_name)

    cls.__init_subclass__ = classmethod(__init_subclass__)
    cls.get_subclass = classmethod(get_subclass)
    return cls


# 自定义 Loader 类
class SafePersonLoader(yaml.SafeLoader):
    pass


@collect_subclasses
class BaseClass:
    def __init__(self, **kwargs):
        self.dump_func = {
            "yaml": self.dump_to_yaml,
            "json": self.dump_to_json,
            "xml": self.dump_to_xml,
        }
        self.load_func = {
            "yaml": self.load_from_yaml,
            "json": self.load_from_json,
            "xml": self.load_from_xml,
        }

    def to_dict(self):
        return self.__dict__

    def from_dict(self, data_dict):
        self.__dict__.update(data_dict)

    def serialize(self, type_name="yaml"):
        dump_func = self.dump_func.get(type_name)
        if dump_func is None:
            raise ValueError(f"Invalid type_name: {type_name}")
        data_dict = self.to_dict()
        return dump_func(data_dict)

    def unserialize(self, data_str, type_name="yaml"):
        load_func = self.load_func.get(type_name)
        if load_func is None:
            raise ValueError(f"Invalid type_name: {type_name}")
        data_dict = load_func(data_str)
        class_name = data_dict["class_name"]
        sub_cls = BaseClass.get_subclass(class_name)
        sub_obj = sub_cls()
        sub_obj.from_dict(data_dict)
        return sub_obj

    def dump_to_yaml(self, data_dict):
        return yaml.dump(data_dict)

    def load_from_yaml(self, data_str):
        data_dict = yaml.load(data_str, Loader=SafePersonLoader)
        return data_dict

    def dump_to_json(self, data_dict):
        return json.dumps(data_dict)

    def load_from_json(self, data_str):
        return json.loads(data_str)

    def dump_to_xml(self, data_dict):
        if len(data_dict) != 1:
            data_dict = {"root": data_dict}
        return xmltodict.unparse(data_dict, pretty=True)

    def load_from_xml(self, data_str):
        data_dict = xmltodict.parse(data_str)
        if data_dict["root"]:
            data_dict = data_dict["root"]
        return data_dict


class SubClass1(BaseClass):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.name = "test"
        self.age = 18
        self.ids = [1, 2, 3, [1, 2, 3, 4]]

    def to_dict(self):
        return {
            "class_name": self.__class__.__name__,
            "name": self.name,
            "age": self.age,
            "ids": self.ids,
        }

    def from_dict(self, data_dict):
        self.name = data_dict["name"]
        self.age = data_dict["age"]
        self.ids = data_dict["ids"]


class SubClass2(BaseClass):
    pass

      
if __name__ == "__main__":
    # 1.子类收集
    # 获取 SubClass1
    subclass1 = BaseClass.get_subclass("SubClass1")
    print(subclass1)

    # 2.自定义序列化与反序列化的对象
    # 序列化后发送
    obj = SubClass1()
    obj_str = obj.serialize(type_name="json")
    # 接收后反序列化
    obj_new = BaseClass().unserialize(obj_str, type_name="json")
    print(obj_new)

pickle & msgpack & marshal

pickle

The Python pickle Module: How to Persist Objects in Python
Python的pickle模块详解

msgpack

官网:https://msgpack.org/
Python版本:https://github.com/aviramha/ormsgpack/tree/master
使用实例:MessagePack简介及使用:一种有效的二进制序列化格式

marshal

官网:marshal — Internal Python object serialization
使用实例:Python 中的 marshal 模块

自定义导出py文件

一个导出py文件的示例

# -*- coding: utf-8 -*-
import os
import json


class TestExport():
    EXPORT_START = '# ----------------export-begin----------------'
    EXPORT_END = '# ----------------export-end----------------\n'
    LINE_INDENT = '    '

    def __init__(self):
        self._dump_handler = {
            bytes: self._dump_bytes,
            str: self._dump_string,
            list: self._dump_list,
            dict: self._dump_dict,
        }

    def dump_to_str(self, py_data):
        dump_data = "{}\n{}\n{}".format(
            self.EXPORT_START,
            'export_data = %s' % self._dumps(py_data),
            self.EXPORT_END
        )
        return dump_data

    def dump_to_file(self, py_data, file_path):
        abs_file_path = os.path.abspath(file_path)
        file_dir = os.path.dirname(abs_file_path)
        try:
            # exist_ok=True 表示如果目录已存在则不会抛出异常
            os.makedirs(file_dir, exist_ok=True)
        except Exception as e:
            print(f"创建目录 {file_dir} 时出错: {e}")

        old_file_content = ""
        # 检查文件是否存在
        if os.path.exists(abs_file_path):
            try:
                # 指定编码方式为 utf-8
                with open(abs_file_path, "r", encoding="utf-8") as fp:
                    old_file_content = fp.read()
            except Exception as e:
                print(f"读取文件 {abs_file_path} 时出错: {e}")
        dump_data = self.dump_to_str(py_data)

        start_index = old_file_content.find(self.EXPORT_START)
        end_index = old_file_content.find(self.EXPORT_END) + len(self.EXPORT_END)
        if start_index != -1 and end_index != -1:
            # 提取 self.EXPORT_START 之前和 self.EXPORT_END 之后的内容
            prefix = old_file_content[:start_index]
            suffix = old_file_content[end_index:]
            dump_data = prefix + dump_data + suffix
        else:
            # 若标记不全,直接追加原文件内容
            dump_data += old_file_content
        with open(abs_file_path, "w", encoding="utf-8") as fp:
            fp.write(dump_data)

    def _dumps(self, in_data):
        dump_func = self._dump_handler.get(type(in_data), self._dump_default)
        return dump_func(in_data)

    def _dump_default(self, in_data):
        return str(in_data)

    def _dump_string(self, in_str):
        return json.dumps(in_str, ensure_ascii=False)

    def _dump_bytes(self, in_bytes):
        return json.dumps(in_bytes.decode("utf-8"), ensure_ascii=False)

    def _dump_list(self, in_list):
        if not in_list:
            return "[]"
        ret = "[\n"
        for v in in_list:
            v_str = self._dumps(v)
            # 如果v_str中含有\n, 说明有嵌套
            if "\n" in v_str:
                v_str += ","
                for line in v_str.split("\n"):
                    ret += "{}{}\n".format(self.LINE_INDENT, line)
            else:
                ret += "{}{},\n".format(self.LINE_INDENT, v_str)
        ret += "]"
        return ret

    def _dump_dict(self, in_dict):
        if not in_dict:
            return "{}"
        ret = "{\n"
        keyList = sorted(in_dict.keys())
        for k in keyList:
            v = in_dict[k]
            k_str = self._dumps(k)
            v_str = self._dumps(v)
            # 如果v_str中含有\n, 说明有嵌套
            if "\n" in v_str:
                sub_v_str = ""
                for line in v_str.split("\n"):
                    sub_v_str += "{}{}\n".format(self.LINE_INDENT, line)
                # 去掉第一个INDENT_STR和最后一个\n
                sub_v_str = sub_v_str[len(self.LINE_INDENT):-1]
                ret += "{}{}: {},\n".format(self.LINE_INDENT, k_str, sub_v_str)
            else:
                ret += "{}{}: {},\n".format(self.LINE_INDENT, k_str, v_str)
        ret += "}"
        return ret


if __name__ == "__main__":
    test_dict = {
        "a_list": [1, 2, 3, [1, 2, 3, 4], {"x": "x1"}],
        "b_str": "b",
        "d": 4,
        "c_dict": {
            "c": 3,
        },
    }
    test_export = TestExport()
    test_export.dump_to_file(test_dict, "./TestExport.py")

导出示例
在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到