1. 导入txt文件内容到mongo
主要涉及io操作、pymongo操作、字符串操作、时间操作
字符串操作
# 去除空格
line1= line.strip()
# 字符串切割
arr=line1.split(FILE_SPLIT)
# 字符串拼接
name=PREFIX+arr[0]
时间操作
time.time()拿到带小数位当前秒数时间戳,乘以1000拿到带小数的毫秒时间戳,然后浮点型强转整形,再转成mongo的NumberLong
info['createTime']=bson.Int64(int(time.time()*1000));
io导入
with open(FILE_PATH, 'r',encoding=FILE_ENCODING) as f:
line = f.readline()
while line:
# 处理逻辑
# 处理逻辑完毕后继续读取下一行
line = f.readline()
return;
入口函数
入口函数,是程序的入口。通常情况下,程序从入口函数开始执行,并在程序的其他部分中调用其他函数或方法。Python程序的入口函数为main函数。
if name == ‘main’:
main()
在Python中,当我们执行一个.py文件的时候,解释器会首先读取整个文件,并且把默认的模块变量在全局命名空间中识别出来。然后,根据Python文件中是否存在main函数,来决定是否执行main函数。
待导入的txt文件
张三,18,98
李四,29,87
完整脚本
#! -*- coding:utf-8 -*-
import pymongo
import os
import bson
import time
# 安装环境依赖 Python3.6
# ====以下参数配置,根据实际情况修改====================
MONGO_URL = "mongodb://root:123456@10.1.69.30:27019/study?authSource=admin"
#数据库
DATABASE = "study"
# 导入文件的路径
FILE_PATH = "./source.txt"
# 文件的编码格式
FILE_ENCODING = "utf-8"
# 文件的分隔符
FILE_SPLIT = ",";
# 前缀
PREFIX = "TEST";
# 数据格式
student={
"name" : "张三",
"age" : 13,
"score" : 98,
"createById":"147258",
"createTime" : bson.Int64(1698740329762)
}
# ====以上是参数配置,根据实际情况修改====================
num=0
CLIENT = pymongo.MongoClient(MONGO_URL)
def getStudnet(name):
ta = _mongoClient.get_database(DATABASE).get_collection("Student");
object = ta.find_one({"name": name});
return object;
def addStudnet(info):
ta = _mongoClient.get_database(DATABASE).get_collection("Student");
ta.insert_one(info);
def init():
info={};
info['createById']=student['createById'];
return info;
def start_business(num):
with open(FILE_PATH, 'r',encoding=FILE_ENCODING) as f:
line = f.readline()
while line:
line1= line.strip()
# print(line1)
arr=line1.split(FILE_SPLIT)
name=PREFIX+arr[0]
# print(name)
info=getStudnet(name)
# print(info)
if(info == None):
info=init();
info['name']=name;
info['age']=arr[1];
info['socre']=arr[2];
info['createTime']=bson.Int64(int(time.time()*1000));
addStudnet(info)
info=None
num+=1;
else:
print(name+"已存在,不再导入")
line = f.readline()
return num;
if __name__ == '__main__':
start=int(time.time())
_mongoClient = CLIENT;
total=start_business(num);
_mongoClient.close();
end = int(time.time());
print("总共导入"+str(total)+"条数据,耗时"+str(end-start)+"秒")
执行脚本
python3 study.py
2. 从第三方库查询拿到id然后查询己方库导出
主要涉及http操作、pymongo操作、字符串操作、三目运算符、list切片、json解析
python发送post,timeout设置连接超时和读取超时
requests.post(SENSORS_URL, headers=headers, data=data,timeout=(CONNECTION_TIMEOUT,READ_TIMEOUT))
json解析
result_dict = json.loads(str(result));
三目运算符
total= len1//PAGE_SIZE_CRM if len1%PAGE_SIZE_CRM0 else len1//PAGE_SIZE_CRM+1
等价于java:total= len1%PAGE_SIZE_CRM0? len1//PAGE_SIZE_CRM:len1//PAGE_SIZE_CRM+1
list切片
参考 Python 列表(List)基本操作
使用切片访问列表的格式为 list_name[strat : end : step] ,其中,start 表示起始索引,end 表示结束索引,step 表示步长。step不写默认为1。如果切片到了最后一次,则不写end.
while i<total:
if i+1==total:
list1=list0[iPAGE_SIZE_CRM:];
else:
list1=list0[iPAGE_SIZE_CRM:(i+1)*PAGE_SIZE_CRM];
pymongo操作
只返回部分字段,操作同mongo js 脚本,1为展示,0为不展示。
collection=_mongoClient.get_database(DATABASE).get_collection(“Student”);
with collection.find({“id”:{“$in”:list1}},{ “id”: 1, “name”: 1, “age”: 1 }) as cursor:
for stu in cursor:
print(stu.get(‘id’)+“,”+stu.get(‘name’)+“,”+stu.get(‘age’));
完整脚本
#! -*- coding:utf-8 -*-
import pymongo
import requests
import json
from datetime import datetime, timezone, timedelta
from bson import ObjectId
# 安装环境依赖 Python3.6
# ====以下参数配置,根据实际情况修改====================
MONGO_URL = "mongodb://root:123456@10.1.69.30:27019/study?authSource=admin"
SENSORS_URL = "https://scmanager.study.com/api/sql/query?token=123456789&project=default"
# 数据库
DATABASE = "study"
# 每次查询神策的条数
PAGE_SIZE = 10000
# 每次查询study的条数
PAGE_SIZE_CRM = 100
# 连接超时
CONNECTION_TIMEOUT = 30;
# 读取超时
READ_TIMEOUT = 30;
# ====以上是参数配置,根据实际情况修改====================
CLIENT = pymongo.MongoClient(MONGO_URL)
def getMongoDBClient():
CLIENT = pymongo.MongoClient(MONGO_URL)
return CLIENT
# 请求神策
def request_sensors(table, page, pageSize=100):
# curl
# 'https://scmanager.study.com/api/sql/query?token=123456789&project=production' - X
# POST - -data - urlencode
# "q=select distinct_id,base_time from student limit 10;" --data-urlencode
# "format=json"
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
}
start = (page - 1) * pageSize
end = pageSize;
data = f'q=select name from {table} order by id limit {start},{end} ;&format=json'
resp = requests.post(SENSORS_URL, headers=headers, data=data,timeout=(CONNECTION_TIMEOUT,READ_TIMEOUT))
return resp;
# 分页请求神策
def start_business():
print("id,name,age")
page = 1
while (True):
try:
response = request_sensors("users",page, PAGE_SIZE);
if (response.status_code != 200):
print(f"status_codePAGE_SIZE:{PAGE_SIZE},page:{page}");
break;
current_page_size = format_sensors_response(response.content);
if (current_page_size < PAGE_SIZE):
break;
page = page + 1;
except Exception as ex:
break;
return;
# 解析神策返回并打印目标数据
def format_sensors_response(content):
list0=[];
result_content = str(content, "utf-8").strip();
result_list = result_content.split('\n');
for result in result_content.split('\n'):
result_dict = json.loads(str(result));
id = result_dict['id'];
if(id != None):
list0.append(id);
len1 = len(list0)
if len1>0:
list1=[];
total= len1//PAGE_SIZE_CRM if len1%PAGE_SIZE_CRM==0 else len1//PAGE_SIZE_CRM+1
i=0
while i<total:
if i+1==total:
list1=list0[i*PAGE_SIZE_CRM:];
else:
list1=list0[i*PAGE_SIZE_CRM:(i+1)*PAGE_SIZE_CRM];
with _mongoClient.get_database(DATABASE).get_collection("Student").find({"id":{"$in":list1}},{ "id": 1, "name": 1, "age": 1 }) as cursor:
for stu in cursor:
print(stu.get('id')+","+stu.get('name')+","+stu.get('age'));
list1.clear();
i=i+1;
return len(result_list);
if __name__ == '__main__':
_mongoClient = getMongoDBClient();
start_business();
_mongoClient.close();
执行脚本
python3 study.py >study.csv
3. 踩坑
python没有大括号来控制代码段,而是使用缩进来控制代码段。
python中的True和False首字母必须大写,在pymongo中涉及boolean类型也是使用首字母大写
SyntaxError: invalid syntax:语法错误,极有可能是末尾冒号:忘记写了
如果脚本window下能执行,linux下不能执行,请先检测两个环境的版本是否一致,如果一致还报错,则添加打印日志,看具体是哪一行执行阻塞