python脚本实战

发布于:2024-05-10 ⋅ 阅读:(26) ⋅ 点赞:(0)

1. 导入txt文件内容到mongo

主要涉及io操作、pymongo操作、字符串操作、时间操作

字符串操作

# 去除空格
line1= line.strip()
# 字符串切割
arr=line1.split(FILE_SPLIT)
# 字符串拼接
name=PREFIX+arr[0]

时间操作

time.time()拿到带小数位当前秒数时间戳,乘以1000拿到带小数的毫秒时间戳,然后浮点型强转整形,再转成mongo的NumberLong

 info['createTime']=bson.Int64(int(time.time()*1000));

io导入

with open(FILE_PATH, 'r',encoding=FILE_ENCODING) as f:
        line = f.readline()
        while line:
            # 处理逻辑
            # 处理逻辑完毕后继续读取下一行
            line = f.readline()
    return;

入口函数

入口函数,是程序的入口。通常情况下,程序从入口函数开始执行,并在程序的其他部分中调用其他函数或方法。Python程序的入口函数为main函数。

if name == ‘main’:
main()

在Python中,当我们执行一个.py文件的时候,解释器会首先读取整个文件,并且把默认的模块变量在全局命名空间中识别出来。然后,根据Python文件中是否存在main函数,来决定是否执行main函数。

待导入的txt文件

张三,18,98
李四,29,87

完整脚本

#! -*- coding:utf-8 -*-
import pymongo
import os
import bson
import time
# 安装环境依赖 Python3.6
# ====以下参数配置,根据实际情况修改====================
MONGO_URL = "mongodb://root:123456@10.1.69.30:27019/study?authSource=admin"
#数据库
DATABASE = "study"
# 导入文件的路径
FILE_PATH = "./source.txt"
# 文件的编码格式
FILE_ENCODING = "utf-8"
# 文件的分隔符
FILE_SPLIT = ",";
# 前缀
PREFIX = "TEST";
# 数据格式
student={
    "name" : "张三",
    "age" : 13,
    "score" : 98,
    "createById":"147258",
    "createTime" : bson.Int64(1698740329762)
}

# ====以上是参数配置,根据实际情况修改====================
num=0
CLIENT = pymongo.MongoClient(MONGO_URL)

def getStudnet(name):
    ta = _mongoClient.get_database(DATABASE).get_collection("Student");
    object = ta.find_one({"name": name});
    return object;

def addStudnet(info):
    ta = _mongoClient.get_database(DATABASE).get_collection("Student");
    ta.insert_one(info);

def init():
    info={};
    info['createById']=student['createById'];
    return info;

def start_business(num):
    with open(FILE_PATH, 'r',encoding=FILE_ENCODING) as f:
        line = f.readline()
        while line:
            line1= line.strip()
            # print(line1)
            arr=line1.split(FILE_SPLIT)
            name=PREFIX+arr[0]
            # print(name)
            info=getStudnet(name)
            # print(info)
            if(info == None):
                info=init();
                info['name']=name;
                info['age']=arr[1];
                info['socre']=arr[2];
                info['createTime']=bson.Int64(int(time.time()*1000));
                addStudnet(info)
                info=None
                num+=1;
            else:
                print(name+"已存在,不再导入")
            line = f.readline()
    return num;

if __name__ == '__main__':
    start=int(time.time())
    _mongoClient = CLIENT;
    total=start_business(num);
    _mongoClient.close();
    end = int(time.time());
    print("总共导入"+str(total)+"条数据,耗时"+str(end-start)+"秒")

执行脚本
python3 study.py

2. 从第三方库查询拿到id然后查询己方库导出

主要涉及http操作、pymongo操作、字符串操作、三目运算符、list切片、json解析

python发送post,timeout设置连接超时和读取超时
requests.post(SENSORS_URL, headers=headers, data=data,timeout=(CONNECTION_TIMEOUT,READ_TIMEOUT))

json解析
result_dict = json.loads(str(result));

三目运算符
total= len1//PAGE_SIZE_CRM if len1%PAGE_SIZE_CRM0 else len1//PAGE_SIZE_CRM+1
等价于java:total= len1%PAGE_SIZE_CRM
0? len1//PAGE_SIZE_CRM:len1//PAGE_SIZE_CRM+1

list切片
参考 Python 列表(List)基本操作
使用切片访问列表的格式为 list_name[strat : end : step] ,其中,start 表示起始索引,end 表示结束索引,step 表示步长。step不写默认为1。如果切片到了最后一次,则不写end.
while i<total:
if i+1==total:
list1=list0[iPAGE_SIZE_CRM:];
else:
list1=list0[i
PAGE_SIZE_CRM:(i+1)*PAGE_SIZE_CRM];

pymongo操作
只返回部分字段,操作同mongo js 脚本,1为展示,0为不展示。
collection=_mongoClient.get_database(DATABASE).get_collection(“Student”);
with collection.find({“id”:{“$in”:list1}},{ “id”: 1, “name”: 1, “age”: 1 }) as cursor:
for stu in cursor:
print(stu.get(‘id’)+“,”+stu.get(‘name’)+“,”+stu.get(‘age’));

完整脚本

#! -*- coding:utf-8 -*-
import pymongo
import requests
import json
from datetime import datetime, timezone, timedelta
from bson import ObjectId

# 安装环境依赖 Python3.6
# ====以下参数配置,根据实际情况修改====================
MONGO_URL = "mongodb://root:123456@10.1.69.30:27019/study?authSource=admin"
SENSORS_URL = "https://scmanager.study.com/api/sql/query?token=123456789&project=default"
# 数据库
DATABASE = "study"
# 每次查询神策的条数
PAGE_SIZE = 10000
# 每次查询study的条数
PAGE_SIZE_CRM = 100
# 连接超时
CONNECTION_TIMEOUT = 30;
# 读取超时
READ_TIMEOUT = 30;
# ====以上是参数配置,根据实际情况修改====================
CLIENT = pymongo.MongoClient(MONGO_URL)

def getMongoDBClient():
    CLIENT = pymongo.MongoClient(MONGO_URL)
    return CLIENT

# 请求神策
def request_sensors(table, page, pageSize=100):
    # curl
    # 'https://scmanager.study.com/api/sql/query?token=123456789&project=production' - X
    # POST - -data - urlencode
    # "q=select distinct_id,base_time from student limit 10;" --data-urlencode
    # "format=json"
    headers = {
        'Content-Type': 'application/x-www-form-urlencoded',
    }
    start = (page - 1) * pageSize
    end = pageSize;
    data = f'q=select name from {table} order by id limit {start},{end} ;&format=json'
    resp = requests.post(SENSORS_URL, headers=headers, data=data,timeout=(CONNECTION_TIMEOUT,READ_TIMEOUT))
    return resp;

# 分页请求神策
def start_business():
    print("id,name,age")
    page = 1
    while (True):
        try:
            response = request_sensors("users",page, PAGE_SIZE);
            if (response.status_code != 200):
                print(f"status_codePAGE_SIZE:{PAGE_SIZE},page:{page}");
                break;
            current_page_size = format_sensors_response(response.content);
            if (current_page_size < PAGE_SIZE):
                break;
            page = page + 1;
        except Exception as ex:
            break;
    return;

# 解析神策返回并打印目标数据
def format_sensors_response(content):
    list0=[];
    result_content = str(content, "utf-8").strip();
    result_list = result_content.split('\n');
    for result in result_content.split('\n'):
        result_dict = json.loads(str(result));
        id = result_dict['id'];
        if(id != None):
            list0.append(id);
    len1 = len(list0)
    if len1>0:
        list1=[];
        total= len1//PAGE_SIZE_CRM if len1%PAGE_SIZE_CRM==0 else len1//PAGE_SIZE_CRM+1
        i=0
        while i<total:
            if i+1==total:
                list1=list0[i*PAGE_SIZE_CRM:];
            else:
                list1=list0[i*PAGE_SIZE_CRM:(i+1)*PAGE_SIZE_CRM];
            with _mongoClient.get_database(DATABASE).get_collection("Student").find({"id":{"$in":list1}},{ "id": 1, "name": 1, "age": 1 }) as cursor:
                for stu in cursor:
                    print(stu.get('id')+","+stu.get('name')+","+stu.get('age'));
            list1.clear();
            i=i+1;
    return len(result_list);

if __name__ == '__main__':
    _mongoClient = getMongoDBClient();
    start_business();
    _mongoClient.close();

执行脚本
python3 study.py >study.csv

3. 踩坑

python没有大括号来控制代码段,而是使用缩进来控制代码段。

python中的True和False首字母必须大写,在pymongo中涉及boolean类型也是使用首字母大写

SyntaxError: invalid syntax:语法错误,极有可能是末尾冒号:忘记写了

如果脚本window下能执行,linux下不能执行,请先检测两个环境的版本是否一致,如果一致还报错,则添加打印日志,看具体是哪一行执行阻塞


网站公告

今日签到

点亮在社区的每一天
去签到