python使用mongo操作

发布于:2024-05-07 ⋅ 阅读:(22) ⋅ 点赞:(0)

目前有个需求,就是把所有sql转为mongo管道查询

知识点

在 MongoDB 中,allowDiskUse 选项应该作为聚合命令的一个选项,而不是聚合管道的一个阶段。allowDiskUse 选项用于允许聚合操作使用磁盘空间来临时存储数据(当聚合操作的数据集非常大,无法在内存中完全处理时)。

$group 阶段:

这个阶段按照 province 字段的值对数据进行分组。
对于每个分组(即每个省份),它会收集所有不同的 $sha1 字段值,并将它们放入一个集合(set)中,因为这个阶段使用了 $addToSet 累加器。由于集合(set)中的元素是唯一的,所以每个 $sha1 值只会被添加到集合中一次。
这个集合被存储在新的 cert_count 字段中,但实际上它包含了 $sha1 的值,而不是真正的“证书数量”。

$addFields 阶段:

这个阶段是对上一阶段输出文档的一个处理。
它使用了 $size 表达式来计算 cert_count 字段(实际上是一个集合)中的元素数量,并将结果仍然保存在 cert_count 字段中。
经过这个阶段后,cert_count 字段现在包含了每个省份中不同的 $sha1 值的数量。虽然名称还是 cert_count,但现在它实际上表示的是每个省份中不同 $sha1 值的数量,而不是直接的“证书数量”。

需求一:目前的需求是根据省份分组求唯一证书的数量,所以需要去重

原有sql
省:证书 分组统计

     query = f"""
                        SELECT province, COUNT(DISTINCT(sha1)) AS cert_count
                        FROM {cls.table_name}
                        Group BY province
                        ORDER BY cert_count DESC
                        """

去重前

db.cert_info.aggregate([
   {  
    '$group': {  
        '_id': '$province',  
        'cert_count': {  
            '$sum': 1  
        }  
    }  
},
   
    {
        '$sort': {
            'cert_count': - 1
        }
    },
    {
        '$limit': 10
    }
], {
    allowDiskUse: true
})

结果
在这里插入图片描述

去重后

第一种写法:

db.cert_info.aggregate([  
    {  
        '$group': {  
            '_id': {  
                'province': '$province',  
                'sha1': '$sha1'  
            },  
            // 由于我们只需要去重计数,所以这里不需要额外的累加器  
            // 我们只是简单地按 province 和 sha1 组合进行分组  
        }  
    },  
    {  
        '$group': {  
            '_id': '$_id.province', // 按 province 重新分组  
            'cert_count': {  
                '$sum': 1 // 计算每个 province 的唯一 sha1 的数量  
            }  
        }  
    },  
    {  
        '$sort': {  
            'cert_count': -1 // 按证书数量降序排序  
        }  
    },  
    {  
        '$limit': 10 // 限制结果集大小为 10  
    }  
], {  
    allowDiskUse: true // 允许聚合操作使用磁盘空间  
});

结果
在这里插入图片描述

第二种写法:

db.cert_info.aggregate([
    {
        '$group': {
            '_id': '$province',
            'cert_count': {
                '$addToSet': '$sha1'
            }
        }
    },
    {
        '$addFields': {
            'cert_count': {
                '$size': '$cert_count'
            }
        }
    },
    {
        '$sort': {
            'cert_count': - 1
        }
    },
    {
        '$limit': 10
    }
], {
    allowDiskUse: true
})

查询结果
在这里插入图片描述

需求二:按照行业分组,统计唯一证书数量,过滤掉其他和未知的行业

原有sql

 query = f"""
                    SELECT yb_industry, COUNT(DISTINCT(sha1)) AS cert_count
                    FROM {cls.table_name}
                    WHERE yb_industry NOT IN ['其他','未知']
                    Group BY yb_industry
                    ORDER BY cert_count DESC
                  """

mongo

db.cert_info.aggregate([{
            '$match': {
                'yb_industry': {
                    '$nin': ['其他', '未知']  
                }
            }
        },
        {
            '$group': {
                '_id': '$yb_industry',  
                'cert_count': {
                    '$addToSet': '$sha1'  
                }
            }
        },
        {
            '$addFields': {
                'cert_count': {
                    '$size': '$cert_count'  
                }
            }
        },
        {
            '$sort': {
                'cert_count': -1  
            }
        },
        {
            '$limit': 10
        }], {allowDiskUse: true})


在这里插入图片描述

再加个省份分组

province_pipeline = [
        {
            '$match': {
                'domestic': True,
                'yb_industry': {
                    '$nin': ['其他', '未知']
                }
            }
        },
        {
            '$addFields': {
                'province_temp': '$province'
            }
        },
        {
            '$group': {
                '_id': {
                    'yb_industry': '$yb_industry',
                    'province': '$province_temp'
                },
                'sha1_set': {
                    '$addToSet': '$sha1'
                }
            }
        },
        {
            '$addFields': {
                'province': '$_id.province',
                'cert_count': {
                    '$size': {
                        '$setUnion': ['$sha1_set', []]
                    }
                }
            }
        },
        {
            '$sort': {
                'cert_count': -1
            }
        },
        {
            '$project': {
                '_id': 0,
                'yb_industry': '$_id.yb_industry',
                'province': '$_id.province',
                'cert_count': 1
            }
        }
    ]

查询的时候按照省份过滤,取top10

# 使用filter函数过滤data列表
                filtered_data = list(filter(lambda item: item['province'] == province, data))
                top10_data = filtered_data[:10]
                return Response(top10_data)

需求三:查询过期的证书

原有sql

SELECT province, COUNT(DISTINCT(sha1)) AS cert_count
                       FROM {cls.table_name}
                       WHERE end < NOW()
                       Group BY province
                       ORDER BY cert_count DESC