初入 python Django 框架总结

发布于:2025-06-09 ⋅ 阅读:(18) ⋅ 点赞:(0)

1. 初入

最近接收一个python 改造项目,初始信息满满,中间很疲惫、很想放弃,最后咬牙完成,先将遇到的问题、要点记录如下:

2. Django

Django的默写技术要点参考:
https://www.runoob.com/django/django-project-intro.html

2.1 模块

在这里插入图片描述
目录下建立一个__init__.py的空文件,这个目录就会被认为是一个模块;

2.2 应用

一个项目中可以新建多个应用-app,类似于不同业务功能的模块。
在 Django 里,应用属于项目内部独立的功能模块,具备自身的数据模型、视图、模板以及 URL 配置,能够独立运行,也能集成到其他项目中。应用的存在让代码实现了模块化管理,使项目结构更为清晰,也便于复用。
应用的主要用途
功能拆分:可以把项目按照功能拆分成不同的应用,比如博客项目可拆分为用户认证、文章管理、评论系统等多个应用。
代码复用:开发完成的应用能够在其他项目中直接使用,不用重新编写代码。
独立维护:各个应用之间相互独立,修改一个应用不会对其他应用造成影响。
应用和项目的区别
项目:它是配置和应用的集合,代表着整个 Web 服务,一个项目可以包含多个应用。
应用:专注于实现某个具体功能,例如用户管理、商品展示等。
应用的创建与使用
创建应用:借助命令 python manage.py startapp app_name 就能创建一个应用。
注册应用:要在项目的 settings.py 文件里的 INSTALLED_APPS 列表中添加应用名称,这样 Django 才能识别该应用。

2.3 数据库交互

Django 模型使用自带的 ORM。

对象关系映射(Object Relational Mapping,简称 ORM )用于实现面向对象编程语言里不同类型系统的数据之间的转换。

ORM 在业务逻辑层和数据库层之间充当了桥梁的作用。

ORM 是通过使用描述对象和数据库之间的映射的元数据,将程序中的对象自动持久化到数据库中。实例如下:

table_list_valuable = TableMeta.objects.filter(need_push=True,tablename__in=table_list)
 TableMeta.objects.filter(need_push=True, db=db).values_list('tablename', flat=True)} 
 update_time_key = ColumnMeta.objects.filter(table=table,columnname=db.update_time_key,is_valuable=True).first()
 max_time  = RecordOfKafkaPush.objects.filter(table_name=table.tablename, db_name=db.database).aggregate(max_time=Max('record_time'))['max_time']
 ColumnMeta.objects.filter(table=table,columnname=db.update_time_key,is_valuable=True).exists():
 RecordOfKafkaPush.objects.filter(db_name=db.database, table_name=table.tablename).delete()

3. 基础配置

3.1 依赖的下载 :

如果不翻墙,直接拉取某些国外依赖,一般会失败,比如:(cx_Oracle、pymssql、django),
可以通过miniconda 管理依赖,配置拉取国内镜像:
pip/conda/minicondata/

3.2 pycharm 如何配置debug

Django 项目控制台的启动命令是:

cd myproject
python manage.py runserver

但如果直接通过控制台启动的话,无法debug,Pycharm需要配置如下:
在这里插入图片描述
并且在settings.py中配置:
在这里插入图片描述

4. 开发接口

4.1 接口内容
@method_decorator(csrf_exempt, name='dispatch')
class ScheduleTask(View):
    def post(self, request, *args, **kwargs):
        try:
            data = json.loads(request.body)
            table_list = data.get('table_list', None)
            if table_list:
                # global import_table_list
                # import_table_list = table_list
                logging.info(f"接收到的表列表:{table_list}")
        except json.JSONDecodeError as e:
            logging.error(f"JSON解析错误: {e}")
            logging.info("没有接收到表列表,使用默认表列表")
        logging.info("start as task to init import")
        dbbase = getDbbase(True, table_list)
        return format_response({'message': 'OK'})

@method_decorator(csrf_exempt, name='dispatch')
class ScheduleTaskTwo(View):
    def post(self, request, *args, **kwargs):
        try:
            data = json.loads(request.body)
            table_list = data.get('table_list', None)
            if table_list:
                # global import_table_list
                # import_table_list = table_list
                logging.info(f"接收到的表列表:{table_list}")
        except json.JSONDecodeError as e:
            logging.error(f"JSON解析错误: {e}")
            logging.info("没有接收到表列表,使用默认表列表")
        logging.info("start as task to init import")
        dbbase = getDbbase(False, table_list)
        return format_response({'message': 'OK'})

4.2 接口定义
from django.urls import path
from . import views
from . import viewScond
app_name = 'api'

urlpatterns = [
    path('get_init_task', viewScond.ScheduleTask.as_view(),name='get_init_task'),
    path('get_add_task', viewScond.ScheduleTaskTwo.as_view(),name='get_add_task'),
]

4.2 接口调用

在这里插入图片描述

5. 定时任务写法

5.1 创建后台调度器
# 创建后台调度器
scheduler = BackgroundScheduler()
scheduler.add_job(task_of_schedule, 'interval', days=1)
scheduler.start()
# schedule.every(1).days.do(task_of_schedule)

参考:https://cloud.tencent.com/developer/article/2454200?policyId=1004

5.2 多线程、并发
def async_threaded(func):
    """将函数转换为异步线程调用的装饰器"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        thread = threading.Thread(target=func, args=args, kwargs=kwargs)
        thread.start()
        return thread  # 返回线程对象,可用于join或检查状态
    return wrapper
@async_threaded
def task_of_schedule():
    logging.info("task_of_schedule start")
    getDbbase(False)
    logging.info("task_of_schedule end")