首页 > 编程语言 >django 源码解读 python manage.py makemigrations

django 源码解读 python manage.py makemigrations

时间:2023-03-02 11:37:26浏览次数:51  
标签:name makemigrations self py migrations add 源码 migration app

分析命令之前,需要先了解makemigrations 调用的一些类。这样对于后面分析命令时很轻松。

1. MigrationRecorder类

这个类在django/db/migrations/recorder.py文件中,这个类是处理保存在数据库中的迁移记录。在生成的数据库中有django_migrations这样一张表,主要是处理这里的内容。


这个Migration 函数主要是定义django_migrations的模型类
init 初始化连接的数据库,这个connection之后在讲解ORM的时候回去详细解释

migration_qs函数是查询所有的迁移记录,返回一个queryset 对象
has_table 函数注释已经很清楚了,就是判断django_migrations表存不存在
ensure_schema 函数先判断表格是否存在,如果不存在,则创建,这个后面操作表数据时会调用

applid_migrations 返回迁移实例的字典
record_applied 创建一条迁移记录
record_unapplied 删除一条迁移记录
flush 删除所有的迁移记录

2. MigrationGraph类

这个类在django/db/migrations/graph.py文件中,这个类是处理迁移文件之间的关系,迁移文件会依赖上一个迁移文件。 这是一个Node类,初始化定义一个key, children, parents 属性。

这个DummyNode类的注释解释的很清楚,不对应磁盘上的迁移文件,当出现亚节点的时候表示这个节点是有问题的,这时就会抛出异常

这表示项目中所有迁移的有向图, 初始化定义了一个node_map字典,{'key': Node('m1')}, nodes {'key': Migration()}

add_node 先断言key 不在 node_map中,再向字典中添加数据
add_dummy_node 添加亚节点,这个nodes中没有对应的迁移文件
add_dependency 添加一个依赖,这个先去判断 child,parent是否存在于nodes中,再获取node_map中的Node,使用Node中的add_parent方法添加
父节点,再使用add_child 给父节点添加子节点,最后需要验证是否存在亚节点

remove_replaced_nodes 替换节点,replacement 替换的节点,replaced 被替换节点,先找到待替换节点
遍历被替换节点,从nodes和node_map中查找被替换节点,遍历被替换节点的子节点,将自己点的父节点先移除,也就是被替换节点
在判断如果子节点不在被替换节点中(防止节点之间依赖循环),将替换节点添加子节点,子节点添加父节点。父节点一样的逻辑

remove_replacement_node 移除被替换节点,和上面的函数相反





3. MigrationLoader

这个类在django/db/migrations/loader.py文件中,这个类是从磁盘加载迁移文件并且从数据库中获取状态 这个的注释已经写在文件中了。

这里主要的方法就是 build_graph()

点击查看 build_graph
    def build_graph(self):
        """
        Build a migration dependency graph using both the disk and database.
        You'll need to rebuild the graph if you apply migrations. This isn't
        usually a problem as generally migration stuff runs in a one-shot process.
        """
        # Load disk data 从磁盘上载入迁移数据
        self.load_disk()
        # Load database data
        if self.connection is None:
            self.applied_migrations = {}
        else:
            # 如果有数据链接信息,直接查询迁移记录中已经应用的migration, 从django_migrations表中获取数据
            recorder = MigrationRecorder(self.connection)
            self.applied_migrations = recorder.applied_migrations()  # 返回一个字典 {('auth', '0001_initial'): Migration 对象}
        # To start, populate the migration graph with nodes for ALL migrations
        # and their dependencies. Also make note of replacing migrations at this step.
        self.graph = MigrationGraph()
        self.replacements = {}
        for key, migration in self.disk_migrations.items():
            self.graph.add_node(key, migration)  # 添加所有的迁移节点 key=('auth', '0001_initial')
            # Replacing migrations.
            if migration.replaces:  # 判断 Migration 对象中的replaces 列表,是否有要替换的节点
                self.replacements[key] = migration
        for key, migration in self.disk_migrations.items():
            # Internal (same app) dependencies.
            self.add_internal_dependencies(key, migration)   #  添加同应用内的依赖
        # Add external dependencies now that the internal ones have been resolved.
        for key, migration in self.disk_migrations.items():    # 其他应用的依赖
            self.add_external_dependencies(key, migration)
        # Carry out replacements where possible and if enabled.  在可能的情况下进行替换。
        if self.replace_migrations:
            for key, migration in self.replacements.items():
                # Get applied status of each of this migration's replacement
                # targets.
                applied_statuses = [
                    (target in self.applied_migrations) for target in migration.replaces
                ]
                # The replacing migration is only marked as applied if all of
                # its replacement targets are.
                if all(applied_statuses):
                    self.applied_migrations[key] = migration
                else:
                    self.applied_migrations.pop(key, None)
                # A replacing migration can be used if either all or none of
                # its replacement targets have been applied.
                if all(applied_statuses) or (not any(applied_statuses)):
                    self.graph.remove_replaced_nodes(key, migration.replaces)
                else:
                    # This replacing migration cannot be used because it is
                    # partially applied. Remove it from the graph and remap
                    # dependencies to it (#25945).
                    self.graph.remove_replacement_node(key, migration.replaces)
        # Ensure the graph is consistent.
        try:
            self.graph.validate_consistency()
        except NodeNotFoundError as exc:
            # Check if the missing node could have been replaced by any squash
            # migration but wasn't because the squash migration was partially
            # applied before. In that case raise a more understandable exception
            # (#23556).
            # Get reverse replacements.
            reverse_replacements = {}
            for key, migration in self.replacements.items():
                for replaced in migration.replaces:
                    reverse_replacements.setdefault(replaced, set()).add(key)
            # Try to reraise exception with more detail.
            if exc.node in reverse_replacements:
                candidates = reverse_replacements.get(exc.node, set())
                is_replaced = any(
                    candidate in self.graph.nodes for candidate in candidates
                )
                if not is_replaced:
                    tries = ", ".join("%s.%s" % c for c in candidates)
                    raise NodeNotFoundError(
                        "Migration {0} depends on nonexistent node ('{1}', '{2}'). "
                        "Django tried to replace migration {1}.{2} with any of [{3}] "
                        "but wasn't able to because some of the replaced migrations "
                        "are already applied.".format(
                            exc.origin, exc.node[0], exc.node[1], tries
                        ),
                        exc.node,
                    ) from exc
            raise
        self.graph.ensure_not_cyclic()

这里的注释没有写的很详细,等二刷源码是仔细看看

build_graph() 方法中开始调用 load_disk 方法

点击查看代码
 def load_disk(self):
        """Load the migrations from all INSTALLED_APPS from disk."""
        self.disk_migrations = {}     # 磁盘迁移文件
        self.unmigrated_apps = set()  # 没有迁移的应用
        self.migrated_apps = set()    # 已经迁移的应用
        for app_config in apps.get_app_configs():  # 获取settings中的应用
            # Get the migrations module directory
            module_name, explicit = self.migrations_module(app_config.label)  # 获取迁移模块目录
            if module_name is None:
                self.unmigrated_apps.add(app_config.label)
                continue
            was_loaded = module_name in sys.modules  # 判断模块是否导入
            try:
                module = import_module(module_name)  # 导入模块
            except ModuleNotFoundError as e:
                if (explicit and self.ignore_no_migrations) or (
                    not explicit and MIGRATIONS_MODULE_NAME in e.name.split(".")
                ):
                    self.unmigrated_apps.add(app_config.label)
                    continue
                raise
            else:
                # Module is not a package (e.g. migrations.py).
                if not hasattr(module, "__path__"):  # 判断module 是否有__path__属性,如果没有说明还没有生成迁移文件
                    self.unmigrated_apps.add(app_config.label)
                    continue
                # Empty directories are namespaces. Namespace packages have no
                # __file__ and don't use a list for __path__. See
                # https://docs.python.org/3/reference/import.html#namespace-packages
                if getattr(module, "__file__", None) is None and not isinstance(
                    module.__path__, list
                ):
                    self.unmigrated_apps.add(app_config.label)
                    continue
                # Force a reload if it's already loaded (tests need this)
                if was_loaded:
                    reload(module)  # 重新加载module
            self.migrated_apps.add(app_config.label)
            # 应用的迁移模块下的所有迁移文件
            migration_names = {
                name
                for _, name, is_pkg in pkgutil.iter_modules(module.__path__)
                if not is_pkg and name[0] not in "_~"
            }  # 遍历migrations 文件夹下的所有文件并且排除不是包和_开头的文件
            # Load migrations
            for migration_name in migration_names:
                migration_path = "%s.%s" % (module_name, migration_name)  # 拼接migration路径 django.contrib.auth.migrations.0001_initial
                try:
                    migration_module = import_module(migration_path)
                except ImportError as e:
                    if "bad magic number" in str(e):
                        raise ImportError(
                            "Couldn't import %r as it appears to be a stale "
                            ".pyc file." % migration_path
                        ) from e
                    else:
                        raise
                # 迁移文件中必须定义Migration类,否则直接抛出异常
                if not hasattr(migration_module, "Migration"):
                    raise BadMigrationError(
                        "Migration %s in app %s has no Migration class"
                        % (migration_name, app_config.label)
                    )
                # key是一个二元组(应用标签,迁移名), value是迁移文件中定义的Migration类的实例化对象
                # self.disk_migrations['auth', '0001_initial'] = Migrations('0001_initial','auth' )
                self.disk_migrations[app_config.label, migration_name] = migration_module.Migration(
                    migration_name,
                    app_config.label,
                )  # 更新初始化中的 disk_migrations

makemigrations 的操作类

这个类在 django.core.management.commands.makemigrations 中 第一个函数还是 add_arguments()

这里添加参数的方法可以借鉴,以后写脚本需要参数,可以使用

点击查看代码
    def add_arguments(self, parser):
        parser.add_argument(  # 可以指定 app
            "args",
            metavar="app_label",
            nargs="*",
            help="Specify the app label(s) to create migrations for.",
        )
        parser.add_argument(  # 试运行迁移文件,但不会去创建迁移文件
            "--dry-run",
            action="store_true",
            help="Just show what migrations would be made; don't actually write them.",
        )
        parser.add_argument(  # 强制合并迁移文件的冲突
            "--merge",
            action="store_true",
            help="Enable fixing of migration conflicts.",
        )
        parser.add_argument(
            "--empty",
            action="store_true",
            help="Create an empty migration.",
        )
        parser.add_argument(  #
            "--noinput",
            "--no-input",
            action="store_false",
            dest="interactive",
            help="Tells Django to NOT prompt the user for input of any kind.",
        )
        parser.add_argument(
            "-n",
            "--name",
            help="Use this name for migration file(s).",
        )
        parser.add_argument(  # 取消往迁移文件中添加注释
            "--no-header",
            action="store_false",
            dest="include_header",
            help="Do not add header comments to new migration file(s).",
        )
        parser.add_argument(
            "--check",
            action="store_true",
            dest="check_changes",
            help="Exit with a non-zero status if model changes are missing migrations.",
        )
        parser.add_argument(
            "--scriptable",
            action="store_true",
            dest="scriptable",
            help=(
                "Divert log output and input prompts to stderr, writing only "
                "paths of generated migration files to stdout."
            ),
        )

最重要的方法还是 handle 函数

点击查看代码
    @no_translations
    def handle(self, *app_labels, **options):
        self.written_files = []
        self.verbosity = options["verbosity"]  # -v 打印详情
        self.interactive = options["interactive"]
        self.dry_run = options["dry_run"]  # 不会生成迁移
        self.merge = options["merge"]     # 合并
        self.empty = options["empty"]
        self.migration_name = options["name"]   #  自定义迁移名
        if self.migration_name and not self.migration_name.isidentifier():  # 如果migration 文件名称不符合python规范,会报错
            raise CommandError("The migration name must be a valid Python identifier.")
        self.include_header = options["include_header"]
        check_changes = options["check_changes"]
        self.scriptable = options["scriptable"]
        # If logs and prompts are diverted to stderr, remove the ERROR style.
        if self.scriptable:  # 如果日志和提示被转移到stderr,则删除ERROR样式。
            self.stderr.style_func = None

        # Make sure the app they asked for exists
        app_labels = set(app_labels)
        has_bad_labels = False
        for app_label in app_labels:
            try:
                apps.get_app_config(app_label)
            except LookupError as err:
                self.stderr.write(str(err))
                has_bad_labels = True
        if has_bad_labels:
            sys.exit(2)

        # Load the current graph state. Pass in None for the connection so
        # the loader doesn't try to resolve replaced migrations from DB.
        loader = MigrationLoader(None, ignore_no_migrations=True)  #

        # Raise an error if any migrations are applied before their dependencies.
        consistency_check_labels = {config.label for config in apps.get_app_configs()}
        # Non-default databases are only checked if database routers used.
        aliases_to_check = (
            connections if settings.DATABASE_ROUTERS else [DEFAULT_DB_ALIAS]
        )
        for alias in sorted(aliases_to_check):
            connection = connections[alias]
            if connection.settings_dict["ENGINE"] != "django.db.backends.dummy" and any(
                # At least one model must be migrated to the database.
                router.allow_migrate(
                    connection.alias, app_label, model_name=model._meta.object_name
                )
                for app_label in consistency_check_labels
                for model in apps.get_app_config(app_label).get_models()
            ):
                try:
                    loader.check_consistent_history(connection)
                except OperationalError as error:
                    warnings.warn(
                        "Got an error checking a consistent migration history "
                        "performed for database connection '%s': %s" % (alias, error),
                        RuntimeWarning,
                    )
        # Before anything else, see if there's conflicting apps and drop out
        # hard if there are any and they don't want to merge
        conflicts = loader.detect_conflicts()

        # If app_labels is specified, filter out conflicting migrations for
        # unspecified apps.
        if app_labels:
            conflicts = {
                app_label: conflict
                for app_label, conflict in conflicts.items()
                if app_label in app_labels
            }

        if conflicts and not self.merge:
            name_str = "; ".join(
                "%s in %s" % (", ".join(names), app) for app, names in conflicts.items()
            )
            raise CommandError(
                "Conflicting migrations detected; multiple leaf nodes in the "
                "migration graph: (%s).\nTo fix them run "
                "'python manage.py makemigrations --merge'" % name_str
            )

        # If they want to merge and there's nothing to merge, then politely exit
        if self.merge and not conflicts:  # 如果没有冲突,但是需要合并的时候打印
            self.log("No conflicts detected to merge.")
            return

        # If they want to merge and there is something to merge, then
        # divert into the merge code
        if self.merge and conflicts:
            return self.handle_merge(loader, conflicts)

        if self.interactive:
            questioner = InteractiveMigrationQuestioner(
                specified_apps=app_labels,
                dry_run=self.dry_run,
                prompt_output=self.log_output,
            )
        else:
            questioner = NonInteractiveMigrationQuestioner(
                specified_apps=app_labels,
                dry_run=self.dry_run,
                verbosity=self.verbosity,
                log=self.log,
            )
        # Set up autodetector
        autodetector = MigrationAutodetector(
            loader.project_state(),
            ProjectState.from_apps(apps),
            questioner,
        )

        # If they want to make an empty migration, make one for each app
        if self.empty:
            if not app_labels:
                raise CommandError(
                    "You must supply at least one app label when using --empty."
                )
            # Make a fake changes() result we can pass to arrange_for_graph
            changes = {app: [Migration("custom", app)] for app in app_labels}
            changes = autodetector.arrange_for_graph(
                changes=changes,
                graph=loader.graph,
                migration_name=self.migration_name,
            )
            self.write_migration_files(changes)
            return

        # Detect changes
        changes = autodetector.changes(   # 一方面去取生成的迁移文件  {'app': [<Migration middle.0001.initial>]}
            graph=loader.graph,
            trim_to_apps=app_labels or None,
            convert_apps=app_labels or None,
            migration_name=self.migration_name,
        )

        if not changes:
            # No changes? Tell them.
            if self.verbosity >= 1:
                if app_labels:
                    if len(app_labels) == 1:
                        self.log("No changes detected in app '%s'" % app_labels.pop())
                    else:
                        self.log(
                            "No changes detected in apps '%s'"
                            % ("', '".join(app_labels))
                        )
                else:
                    self.log("No changes detected")
        else:
            # 写迁移文件
            self.write_migration_files(changes)
            if check_changes:
                sys.exit(1)
调用的 write_migration_files 函数
点击查看代码
    def write_migration_files(self, changes):
        """
        Take a changes dict and write them out as migration files.
        """
        directory_created = {}
        for app_label, app_migrations in changes.items():
            if self.verbosity >= 1:
                self.log(self.style.MIGRATE_HEADING("Migrations for '%s':" % app_label))
            for migration in app_migrations:
                # Describe the migration
                writer = MigrationWriter(migration, self.include_header)
                if self.verbosity >= 1:
                    # Display a relative path if it's below the current working
                    # directory, or an absolute path otherwise.
                    try:
                        migration_string = os.path.relpath(writer.path)
                    except ValueError:
                        migration_string = writer.path
                    if migration_string.startswith(".."):
                        migration_string = writer.path
                    self.log("  %s\n" % self.style.MIGRATE_LABEL(migration_string))
                    for operation in migration.operations:
                        self.log("    - %s" % operation.describe())  # 打印创建表名称日志
                    if self.scriptable:
                        self.stdout.write(migration_string)
                if not self.dry_run:
                    # Write the migrations file to the disk.
                    migrations_directory = os.path.dirname(writer.path)
                    if not directory_created.get(app_label):
                        os.makedirs(migrations_directory, exist_ok=True)
                        init_path = os.path.join(migrations_directory, "__init__.py")
                        if not os.path.isfile(init_path):
                            open(init_path, "w").close()
                        # We just do this once per app
                        directory_created[app_label] = True
                    migration_string = writer.as_string()  # 迁移文件内容
                    with open(writer.path, "w", encoding="utf-8") as fh:
                        fh.write(migration_string)
                        self.written_files.append(writer.path)
                elif self.verbosity == 3:
                    # Alternatively, makemigrations --dry-run --verbosity 3
                    # will log the migrations rather than saving the file to
                    # the disk.
                    self.log(
                        self.style.MIGRATE_HEADING(
                            "Full migrations file '%s':" % writer.filename
                        )
                    )
                    self.log(writer.as_string())
        run_formatters(self.written_files)

标签:name,makemigrations,self,py,migrations,add,源码,migration,app
From: https://www.cnblogs.com/miss103/p/16976508.html

相关文章

  • torch&numpy常用函数总结
    np.dot()-向量内积np.matmul()&@运算符-矩阵乘法np.multiply()&*运算符-针对标量的运算(各个维度均可)&np.square()-等价于np.multiply(a,a)链接np.c_()a=n......
  • 有趣又实用的python脚本
    1.使用Python进行速度测试这个高级脚本帮助你使用Python测试你的Internet速度。只需安装速度测试模块并运行以下代码。#pipinstallpyspeedtest#pipinstalls......
  • Python抓取数据具体流程
    之前看了一段有关爬虫的网课深有启发,于是自己也尝试着如如何过去爬虫百科“python”词条等相关页面的整个过程记录下来,方便后期其他人一起来学习。抓取策略确定目标:重要......
  • 机器学习python环境搭建
    目的:跑通下面代码相关代码fromtorchimportnnimporttorchimportjiebaimportnumpyasnpraw_text="""越努力就越幸运"""words=list(jieba.cut(raw_text))......
  • Python第三天
    8bit(位)=1byte(字节)1024byte=1kbstr表示字符串(只要是双单引号里的都叫字符串)int表示整数(1、2、3、5)float表示浮点数(3.151) type()数据类型bool表示true,falseint()、str()、fl......
  • Tek示波器自动化测试(PyVISA)
    1.相关资料1.1PyVISAPyVISA·PyPI1.1.1用户指南用户指南—PyVISA文档1.1.2要求Python(使用3.6+测试)VISA(使用NI-VISA17.5,Win7,下载NI-VISA-NI) 1.1.3实例......
  • python 字符串 格式化输出 槽格式 小数的位数与符号控制
    """槽的格式限定冒号:左边填序号或名称,右边填写格式"""#定义一个数num=3.1465926#保留两位小数,并且四舍五入res="{:.2f}".format(num)print(res)#有符号的数字res2=......
  • python 字符串 格式化输出 槽格式 左中右对齐的内容
    """槽的格式限定冒号:左边填序号或名称,右边填写格式"""#右对齐的字符串,宽度为20s1="{:>20}".format("hello")print(s1)#右对齐,宽20,!填充s2="{:!>20}".format("python")p......
  • pyenv 中使用virtualenv
    其实按照官方文档就已经可以安装并使用pyenv和virtualenv了,但在实际操作过程中,你可能还是会遇到各种各样的问题,希望这篇文章能帮到你。值得注意的是,这篇文章所记录的操作都......
  • 【Mybatis】【数据源】【一】Mybatis源码解析-内置数据源
    1 前言这节我们要看一下数据源的东西了。比如我们以前在XML配置的什么驱动、url、账号密码啥的以及现在我们在SpringBoot下配置的其实都是为了创建我们的数据源,那么这节......