使用自定义 Django 命令自动重新加载 Celery 工作线程

使用自定义 django 命令自动重新加载 celery 工作线程

celery 之前有一个 –autoreload 标志,现已被删除。然而,django 在其manage.py runserver 命令中内置了自动重新加载功能。 celery workers 中缺乏自动重新加载会造成令人困惑的开发体验:更新 python 代码会导致 django 服务器使用当前代码重新加载,但服务器触发的任何任务都将在 celery workers 中运行过时的代码。

这篇文章将向您展示如何构建自定义的 manage.py runworker 命令,该命令在开发过程中自动重新加载 celery 工作线程。该命令将模仿 runserver,我们将看看 django 的自动重新加载是如何在幕后工作的。

在我们开始之前

这篇文章假设您有一个已经安装了 celery 的 django 应用程序(指南)。它还假设您了解 django 中的项目和应用程序之间的差异。

所有源代码和文档链接都适用于发布时(2024 年 7 月)当前版本的 django 和 celery。如果你在遥远的将来读到这篇文章,事情可能已经改变了。

最后,主项目目录将在帖子的示例中命名为 my_project。

解决方案:自定义命令

我们将创建一个名为 runworker 的自定义管理.py 命令。因为 django 通过其 runsever 命令提供自动重新加载,所以我们将使用 runserver 的源代码作为我们自定义命令的基础。

您可以通过在项目的任何应用程序中创建 management/commands/ 目录来在 django 中创建命令。创建目录后,您可以在该目录中放置一个带有您要创建的命令名称的 python 文件(文档)。

假设您的项目有一个名为 polls 的应用程序,我们将在 polls/management/commands/runworker.py 中创建一个文件并添加以下代码:

# polls/management/commands/runworker.pyimport sysfrom datetime import datetimefrom celery.signals import worker_initfrom django.conf import settingsfrom django.core.management.base import basecommandfrom django.utils import autoreloadfrom my_project.celery import app as celery_appclass command(basecommand):    help = "starts a celery worker instance with auto-reloading for development."    # validation is called explicitly each time the worker instance is reloaded.    requires_system_checks = []    suppressed_base_arguments = {"--verbosity", "--traceback"}    def add_arguments(self, parser):        parser.add_argument(            "--skip-checks",            action="store_true",            help="skip system checks.",        )        parser.add_argument(            "--loglevel",            choices=("debug", "info", "warning", "error", "critical", "fatal"),            type=str.upper,  # transforms user input to uppercase.            default="info",        )    def handle(self, *args, **options):        autoreload.run_with_reloader(self.run_worker, **options)    def run_worker(self, **options):        # if an exception was silenced in managementutility.execute in order        # to be raised in the child process, raise it now.        autoreload.raise_last_exception()        if not options["skip_checks"]:            self.stdout.write("performing system checks...nn")            self.check(display_num_errors=true)        # need to check migrations here, so can't use the        # requires_migrations_check attribute.        self.check_migrations()        # print django info to console when the worker initializes.        worker_init.connect(self.on_worker_init)        # start the celery worker.        celery_app.worker_main(            [                "--app",                "my_project",                "--skip-checks",                "worker",                "--loglevel",                options["loglevel"],            ]        )    def on_worker_init(self, sender, **kwargs):        quit_command = "ctrl-break" if sys.platform == "win32" else "control-c"        now = datetime.now().strftime("%b %d, %y - %x")        version = self.get_version()        print(            f"{now}n"            f"django version {version}, using settings {settings.settings_module!r}n"            f"quit the worker instance with {quit_command}.",            file=self.stdout,        )

重要提示: 请务必将 my_project 的所有实例替换为您的 django 项目的名称。

如果您想复制并粘贴此代码并继续编程,您可以安全地停在这里,而无需阅读本文的其余部分。这是一个优雅的解决方案,将在您开发 django 和 celery 项目时为您提供良好的服务。但是,如果您想了解更多有关其工作原理的信息,请继续阅读。

它是如何工作的(可选)

我不会逐行查看此代码,而是按主题讨论最有趣的部分。如果您还不熟悉 django 自定义命令,您可能需要在继续之前查看文档。

自动装弹

这部分感觉最神奇。在命令的handle()方法体内,调用了django的内部autoreload.run_with_reloader()。它接受一个回调函数,每次项目中的 python 文件发生更改时都会执行该函数。 实际上是如何运作的?

让我们看一下 autoreload.run_with_reloader() 函数源代码的简化版本。简化的函数重写、内联和删除代码,以使其操作更加清晰。

# NOTE: This has been dramatically pared down for clarity.def run_with_reloader(callback_func, *args, **kwargs):    # NOTE: This will evaluate to False the first time it is run.    is_inside_subprocess = os.getenv("RUN_MAIN") == "true"    if is_inside_subprocess:        # The reloader watches for Python file changes.        reloader = get_reloader()        django_main_thread = threading.Thread(            target=callback_func, args=args, kwargs=kwargs        )        django_main_thread.daemon = True        django_main_thread.start()        # When the code changes, the reloader exits with return code 3.        reloader.run(django_main_thread)    else:        # Returns Python path and the arguments passed to the command.        # Example output: ['/path/to/python', './manage.py', 'runworker']        args = get_child_arguments()        subprocess_env = {**os.environ, "RUN_MAIN": "true"}        while True:            # Rerun the manage.py command in a subprocess.            p = subprocess.run(args, env=subprocess_env, close_fds=False)            if p.returncode != 3:                sys.exit(p.returncode)

当manage.py runworker在命令行中运行时,它会首先调用handle()方法,该方法会调用run_with_reloader()。

在 run_with_reloader() 内部,它将检查名为 run_main 的环境变量是否具有“true”值。当函数第一次被调用时,run_main 应该没有值。

当run_main没有设置为“true”时,run_with_reloader()会进入循环。在循环内,它将启动一个子进程,重新运行传入的manage.py [command_name],然后等待该子进程退出。如果子进程退出并返回代码 3,则循环的下一次迭代将启动一个新的子进程并等待。该循环将一直运行,直到子进程返回不为 3 的退出代码(或直到用户使用 ctrl + c 退出)。一旦得到非3的返回码,就会彻底退出程序。

生成的子进程再次运行manage.py命令(在我们的例子中是manage.py runworker),并且该命令将再次调用run_with_reloader()。这次,run_main 将被设置为“true”,因为该命令在子进程中运行。

现在 run_with_reloader() 知道它位于子进程中,它将获得一个监视文件更改的重新加载器,将提供的回调函数放入线程中,并将其传递给开始监视更改的重新加载器。

当重新加载器检测到文件更改时,它会运行 sys.exit(3)。这将退出子流程,从而触发生成子流程的代码的下一次循环迭代。反过来,会启动一个使用更新版本代码的新子流程。

系统检查和迁移

默认情况下,django 命令在运行其handle() 方法之前执行系统检查。但是,对于 runserver 和我们的自定义 runworker 命令,我们希望推迟运行这些命令,直到进入我们提供给 run_with_reloader() 的回调中。在我们的例子中,这是我们的 run_worker() 方法。这使我们能够运行自动重新加载的命令,同时修复损坏的系统检查。

为了推迟运行系统检查,需要将requires_system_checks属性的值设置为空列表,并通过在run_worker()主体中调用self.check()来执行检查。与 runserver 一样,我们的自定义 runworker 命令也会检查所有迁移是否已运行,如果有待处理的迁移,它会显示警告。

因为我们已经在 run_worker() 方法中执行 django 的系统检查,所以我们通过向 celery 传递 –skip-checks 标志来禁用系统检查,以防止重复工作。

所有与系统检查和迁移相关的代码都是直接从 runserver 命令源代码中提取的。

celery_app.worker_main()

我们的实现使用 celery_app.worker_main() 直接从 python 启动 celery 工作程序,而不是向 celery 发起攻击。

on_worker_init()

此代码在工作进程初始化时执行,显示日期和时间、django 版本以及退出命令。它是根据 runserver 启动时显示的信息建模的。

其他 runserver 样板

以下行也从 runserver 源代码中删除:

suppressed_base_arguments = {“–verbosity”, “–traceback”}autoreload.raise_last_exception()

日志级别

我们的自定义命令具有可配置的日志级别,以防开发人员希望在不修改代码的情况下从 cli 调整设置。

更进一步

我研究了 django 和 celery 的源代码来构建这个实现,并且有很多扩展它的机会。您可以配置该命令以接受更多 celery 的工作参数。或者,您可以创建一个自定义的 manage.py 命令,它会自动重新加载任何 shell 命令,就像 david browne 在本要点中所做的那样。

如果您觉得本文有用,请随时留下点赞或评论。感谢您的阅读。

以上就是使用自定义 Django 命令自动重新加载 Celery 工作线程的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1348098.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
“从概念到代码:使用 Python 构建提醒应用程序”
上一篇 2025年12月13日 11:44:31
字符串和尾随逗号,耦合并成为,Tuple ():将错误复制并粘贴到错误和概念
下一篇 2025年12月13日 11:44:42

相关推荐

  • 学习Python需要具备哪些基础知识?

    学习python需要具备以下基础知识:1.编程基础:理解变量、数据类型、控制结构、函数和模块。2.算法与数据结构:掌握列表、字典、集合等数据结构及排序、搜索等算法。3.面向对象编程:熟悉类、对象、继承、封装和多态。4.python特有的特性:了解列表推导式、生成器、装饰器等。5.开发工具和环境:熟练…

    2026年5月10日
    000
  • 理解 Python 赋值语句的语法结构

    赋值语句是任何编程语言的基础,Python 也不例外。为了理解 Python 赋值语句的底层语法结构,我们需要深入研究其 Backus-Naur 范式(BNF)定义。很多人在初次接触 Python 语法定义时,可能会对复杂的 BNF 表达式感到困惑,尤其是当试图将一个简单的赋值语句,例如 a = 9…

    2026年5月10日
    000
  • Python OpenCV 视频录制:解决0KB文件或损坏问题的教程

    本教程旨在解决使用Python OpenCV进行视频录制时,生成0KB或损坏MP4文件的问题。核心原因在于cv2.VideoWriter的写入分辨率与摄像头实际输出分辨率不匹配。文章将详细指导如何正确获取摄像头实际工作分辨率,并将其应用于视频写入器,确保录制过程顺畅,生成可播放的视频文件。 1. O…

    2026年5月10日
    000
  • global在python中啥意思

    global 在 Python 中是一个关键字,用于嵌套函数中访问和修改全局变量:使用 global 关键字将局部变量声明为全局变量。这样做后,局部变量实际上指向了全局变量,而不是创建新的变量。 global 在 Python 中的含义 回答:global 在 Python 中是一个关键字,用于在嵌…

    2026年5月10日
    000
  • 解决 Golang JSON 反序列化 Python 字符串问题

    本文旨在解决 Golang 在反序列化由 Python 产生的 JSON 字符串时遇到的编码问题。核心问题在于 Python 的字符串类型与 Golang 期望的 JSON 格式存在差异,导致解码错误。本文将提供一种通过在 Python 端使用 `json` 库正确生成 JSON 字符串的方法,从而…

    2026年5月10日
    000
  • Mypy配置文件中如何正确排除特定文件夹?

    巧用Mypy配置文件排除特定文件夹 Mypy静态类型检查是提升Python代码质量的利器,但有时我们需要排除某些文件夹,例如虚拟环境文件夹/venv/,避免不必要的错误提示。本文将分析一个Mypy配置文件中排除文件夹无效的常见问题,并提供解决方案。 问题:Mypy配置文件中exclude选项无效 用…

    2026年5月10日
    000
  • 解决AWS CDK Python项目中的依赖冲突:CDK v1与v2共存问题

    本教程旨在解决aws cdk python项目中常见的依赖冲突,特别是当cdk v1和v2版本库在同一环境中混淆时引发的问题。核心冲突在于不同cdk版本对`constructs`库的依赖范围不兼容。文章将详细指导如何通过创建和管理独立的python虚拟环境来彻底解决此类冲突,确保项目依赖的稳定安装与…

    2026年5月10日
    000
  • 解决Django Raw Queryset参数绑定错误:避免id内置函数陷阱

    本文深入探讨了在Django中使用raw查询时,因误将Python内置函数id作为参数传入而导致的ProgrammingError。文章详细解释了该错误的根源,提供了正确的参数绑定方法,即使用具体的对象属性如product.id,并建议在多数情况下优先考虑Django ORM以提升代码的可读性和维护…

    2026年5月10日
    000
  • 优化Python中大量球体无重叠随机运动模拟的策略

    本文旨在探讨并优化在Python中模拟大量(百万级别)球体随机运动同时避免重叠的性能问题。针对初始方案中逐个球体移动和碰撞检测导致的效率低下,我们将介绍三种关键优化策略:利用scipy.spatial.cKDTree的批量邻居查询、启用多核并行处理,以及使用Numba加速计算密集型代码段。通过这些方…

    2026年5月10日
    000
  • 从动态网站抓取隐藏电话号码的实用教程

    本教程旨在解决使用beautifulsoup抓取动态加载内容时的局限性。当目标数据(如隐藏的电话号码)通过javascript异步加载时,传统html解析器无法获取。文章将指导读者如何利用浏览器开发者工具识别并模拟网站后端api请求,特别是graphql请求,从而直接获取所需数据。通过python的…

    2026年5月10日
    000
  • python如何判断一个字符串是否全是数字_python isdigit()等方法判断字符串是否为纯数字

    判断字符串是否为纯数字可通过isdigit()、isnumeric()、isdecimal()和正则表达式实现;其中isdigit()适用于ASCII数字,isnumeric()支持更广的数字类型,isdecimal()仅限十进制,正则^d+$可灵活匹配但性能较低;含符号或小数可用float()转换…

    2026年5月10日
    100
  • python中log函数用法 python对数计算方法

    在python中,log函数用于进行对数计算。1)使用math.log()计算自然对数或任意底数的对数;2)使用numpy.log()和numpy.log2()等函数进行高效的对数计算,特别适合处理大规模数据和数组。 在Python中,log函数是用来进行对数计算的强大工具。无论你是做科学计算、数据…

    2026年5月10日
    000
  • pycharm怎么缩小代码

    要缩小 PyCharm 中的代码,可以采用以下步骤:代码折叠以隐藏无关代码。使用 Docstring 注释来记录实现细节。使用重构工具优化代码结构。优化循环和条件语句以提高效率。使用外部库节省重复冗余。配置代码样式检查器以确保代码一致性。遵循 PEP8 指南以提高可读性和可维护性。 如何缩小 PyC…

    2026年5月10日
    000
  • c++如何与Python交互_c++与Python混合编程方法

    ctypes适用于调用C风格简单函数,需将C++封装为extern “C”并编译为共享库,Python通过CDLL加载;2. pybind11是现代首选,支持类、STL容器和重载,编译后生成可import的模块;3. Boost.Python功能强但依赖庞大,配置复杂,逐渐被…

    2026年5月10日
    000
  • Go 语言中的匿名函数(Lambda 表达式)应用指南

    Go语言支持匿名函数,这与许多其他语言中的Lambda表达式概念相似。本文将深入探讨Go语言中匿名函数的定义、使用场景及其作为一等公民的特性,并通过代码示例展示如何在Go中实现类似Lambda的功能,帮助开发者理解并有效利用这一强大特性。 Go 语言中的匿名函数概述 在go语言中,匿名函数(anon…

    2026年5月10日
    000
  • 使用Jinja2与Python动态加载并显示多张图片到HTML

    使用Jinja2与Python动态加载并显示多张图片到HTML使用Jinja2与Python动态加载并显示多张图片到HTML使用Jinja2与Python动态加载并显示多张图片到HTML使用Jinja2与Python动态加载并显示多张图片到HTML

    本文详细介绍了如何利用Jinja2模板引擎与Python后端,高效地将多张图片动态加载并渲染到HTML页面中。核心方法在于将图片数据组织成一个包含字典的列表,其中每个字典代表一张图片及其属性(如标题和文件路径),并通过Jinja2的`for`循环在HTML模板中迭代渲染,从而实现灵活且可维护的多图片…

    2026年5月10日 用户投稿
    100
  • python平方根怎么求

    Python 计算平方根的方法有:使用 math.sqrt() 函数使用 operator**使用内置的 pow() 函数 如何用 Python 计算平方根 Python 提供了多种方法来计算平方根,其中最常用的函数是 math.sqrt() 函数。 使用 math.sqrt() 函数 math.s…

    2026年5月10日
    000
  • Python Pandas如何为数据列添加序号,且连续相同值赋予相同序号?

    使用Python Pandas为数据列添加序号,并为连续相同的值赋予相同的序号。 许多数据处理任务需要对数据列进行编号,且要求连续相同的数值拥有相同的序号,而不同的数值则序号递增。本文将演示如何利用Python的Pandas库高效地实现此功能。 假设有一列数据:[11, 21, 24, 24, 24…

    2026年5月10日
    000
  • 利用 LangChain 的 NLP 功能进行 AI 驱动的图探索,使用 Langchain 进行问答

    编写复杂的SQL或图形数据库查询是否曾让您感到头疼?如果只需用简单的英语描述您的需求就能直接获得结果,那该多好?借助自然语言处理技术的进步,LangChain等工具不仅让这一切成为现实,而且操作起来非常直观。 本文将演示如何结合Python、LangChain和Neo4j,使用自然语言流畅地查询图形…

    2026年5月10日
    000
  • Go 语言中的泛型:概念、影响与演进

    泛型是一种允许在编译时使用类型参数编写代码的编程范式,它使得函数或数据结构能够处理多种数据类型,从而实现代码复用和类型安全。在静态类型语言中,泛型的缺失曾导致大量重复代码,开发者不得不为不同类型的数据集合编写功能相同的函数。go 1.18版本引入泛型后,有效解决了这一痛点,显著提升了代码的灵活性和可…

    2026年5月10日
    000

发表回复

登录后才能评论
关注微信