分析命令之前,需要先了解makemigrations 调用的一些类。这样对于后面分析命令时很轻松。
1. MigrationRecorder类
这个类在django/db/migrations/recorder.py文件中,这个类是处理保存在数据库中的迁移记录。在生成的数据库中有django_migrations这样一张表,主要是处理这里的内容。
这个Migration 函数主要是定义django_migrations的模型类
init 初始化连接的数据库,这个connection之后在讲解ORM的时候回去详细解释
migration_qs函数是查询所有的迁移记录,返回一个queryset 对象
has_table 函数注释已经很清楚了,就是判断django_migrations表存不存在
ensure_schema 函数先判断表格是否存在,如果不存在,则创建,这个后面操作表数据时会调用
applid_migrations 返回迁移实例的字典
record_applied 创建一条迁移记录
record_unapplied 删除一条迁移记录
flush 删除所有的迁移记录
2. MigrationGraph类
这个类在django/db/migrations/graph.py文件中,这个类是处理迁移文件之间的关系,迁移文件会依赖上一个迁移文件。 这是一个Node类,初始化定义一个key, children, parents 属性。这个DummyNode类的注释解释的很清楚,不对应磁盘上的迁移文件,当出现亚节点的时候表示这个节点是有问题的,这时就会抛出异常
这表示项目中所有迁移的有向图, 初始化定义了一个node_map字典,{'key': Node('m1')}, nodes {'key': Migration()}
add_node 先断言key 不在 node_map中,再向字典中添加数据
add_dummy_node 添加亚节点,这个nodes中没有对应的迁移文件
add_dependency 添加一个依赖,这个先去判断 child,parent是否存在于nodes中,再获取node_map中的Node,使用Node中的add_parent方法添加
父节点,再使用add_child 给父节点添加子节点,最后需要验证是否存在亚节点
remove_replaced_nodes 替换节点,replacement 替换的节点,replaced 被替换节点,先找到待替换节点
遍历被替换节点,从nodes和node_map中查找被替换节点,遍历被替换节点的子节点,将自己点的父节点先移除,也就是被替换节点
在判断如果子节点不在被替换节点中(防止节点之间依赖循环),将替换节点添加子节点,子节点添加父节点。父节点一样的逻辑
remove_replacement_node 移除被替换节点,和上面的函数相反
3. MigrationLoader
这个类在django/db/migrations/loader.py文件中,这个类是从磁盘加载迁移文件并且从数据库中获取状态 这个的注释已经写在文件中了。这里主要的方法就是 build_graph()
点击查看 build_graph
def build_graph(self):
"""
Build a migration dependency graph using both the disk and database.
You'll need to rebuild the graph if you apply migrations. This isn't
usually a problem as generally migration stuff runs in a one-shot process.
"""
# Load disk data 从磁盘上载入迁移数据
self.load_disk()
# Load database data
if self.connection is None:
self.applied_migrations = {}
else:
# 如果有数据链接信息,直接查询迁移记录中已经应用的migration, 从django_migrations表中获取数据
recorder = MigrationRecorder(self.connection)
self.applied_migrations = recorder.applied_migrations() # 返回一个字典 {('auth', '0001_initial'): Migration 对象}
# To start, populate the migration graph with nodes for ALL migrations
# and their dependencies. Also make note of replacing migrations at this step.
self.graph = MigrationGraph()
self.replacements = {}
for key, migration in self.disk_migrations.items():
self.graph.add_node(key, migration) # 添加所有的迁移节点 key=('auth', '0001_initial')
# Replacing migrations.
if migration.replaces: # 判断 Migration 对象中的replaces 列表,是否有要替换的节点
self.replacements[key] = migration
for key, migration in self.disk_migrations.items():
# Internal (same app) dependencies.
self.add_internal_dependencies(key, migration) # 添加同应用内的依赖
# Add external dependencies now that the internal ones have been resolved.
for key, migration in self.disk_migrations.items(): # 其他应用的依赖
self.add_external_dependencies(key, migration)
# Carry out replacements where possible and if enabled. 在可能的情况下进行替换。
if self.replace_migrations:
for key, migration in self.replacements.items():
# Get applied status of each of this migration's replacement
# targets.
applied_statuses = [
(target in self.applied_migrations) for target in migration.replaces
]
# The replacing migration is only marked as applied if all of
# its replacement targets are.
if all(applied_statuses):
self.applied_migrations[key] = migration
else:
self.applied_migrations.pop(key, None)
# A replacing migration can be used if either all or none of
# its replacement targets have been applied.
if all(applied_statuses) or (not any(applied_statuses)):
self.graph.remove_replaced_nodes(key, migration.replaces)
else:
# This replacing migration cannot be used because it is
# partially applied. Remove it from the graph and remap
# dependencies to it (#25945).
self.graph.remove_replacement_node(key, migration.replaces)
# Ensure the graph is consistent.
try:
self.graph.validate_consistency()
except NodeNotFoundError as exc:
# Check if the missing node could have been replaced by any squash
# migration but wasn't because the squash migration was partially
# applied before. In that case raise a more understandable exception
# (#23556).
# Get reverse replacements.
reverse_replacements = {}
for key, migration in self.replacements.items():
for replaced in migration.replaces:
reverse_replacements.setdefault(replaced, set()).add(key)
# Try to reraise exception with more detail.
if exc.node in reverse_replacements:
candidates = reverse_replacements.get(exc.node, set())
is_replaced = any(
candidate in self.graph.nodes for candidate in candidates
)
if not is_replaced:
tries = ", ".join("%s.%s" % c for c in candidates)
raise NodeNotFoundError(
"Migration {0} depends on nonexistent node ('{1}', '{2}'). "
"Django tried to replace migration {1}.{2} with any of [{3}] "
"but wasn't able to because some of the replaced migrations "
"are already applied.".format(
exc.origin, exc.node[0], exc.node[1], tries
),
exc.node,
) from exc
raise
self.graph.ensure_not_cyclic()
这里的注释没有写的很详细,等二刷源码是仔细看看
build_graph() 方法中开始调用 load_disk 方法
点击查看代码
def load_disk(self):
"""Load the migrations from all INSTALLED_APPS from disk."""
self.disk_migrations = {} # 磁盘迁移文件
self.unmigrated_apps = set() # 没有迁移的应用
self.migrated_apps = set() # 已经迁移的应用
for app_config in apps.get_app_configs(): # 获取settings中的应用
# Get the migrations module directory
module_name, explicit = self.migrations_module(app_config.label) # 获取迁移模块目录
if module_name is None:
self.unmigrated_apps.add(app_config.label)
continue
was_loaded = module_name in sys.modules # 判断模块是否导入
try:
module = import_module(module_name) # 导入模块
except ModuleNotFoundError as e:
if (explicit and self.ignore_no_migrations) or (
not explicit and MIGRATIONS_MODULE_NAME in e.name.split(".")
):
self.unmigrated_apps.add(app_config.label)
continue
raise
else:
# Module is not a package (e.g. migrations.py).
if not hasattr(module, "__path__"): # 判断module 是否有__path__属性,如果没有说明还没有生成迁移文件
self.unmigrated_apps.add(app_config.label)
continue
# Empty directories are namespaces. Namespace packages have no
# __file__ and don't use a list for __path__. See
# https://docs.python.org/3/reference/import.html#namespace-packages
if getattr(module, "__file__", None) is None and not isinstance(
module.__path__, list
):
self.unmigrated_apps.add(app_config.label)
continue
# Force a reload if it's already loaded (tests need this)
if was_loaded:
reload(module) # 重新加载module
self.migrated_apps.add(app_config.label)
# 应用的迁移模块下的所有迁移文件
migration_names = {
name
for _, name, is_pkg in pkgutil.iter_modules(module.__path__)
if not is_pkg and name[0] not in "_~"
} # 遍历migrations 文件夹下的所有文件并且排除不是包和_开头的文件
# Load migrations
for migration_name in migration_names:
migration_path = "%s.%s" % (module_name, migration_name) # 拼接migration路径 django.contrib.auth.migrations.0001_initial
try:
migration_module = import_module(migration_path)
except ImportError as e:
if "bad magic number" in str(e):
raise ImportError(
"Couldn't import %r as it appears to be a stale "
".pyc file." % migration_path
) from e
else:
raise
# 迁移文件中必须定义Migration类,否则直接抛出异常
if not hasattr(migration_module, "Migration"):
raise BadMigrationError(
"Migration %s in app %s has no Migration class"
% (migration_name, app_config.label)
)
# key是一个二元组(应用标签,迁移名), value是迁移文件中定义的Migration类的实例化对象
# self.disk_migrations['auth', '0001_initial'] = Migrations('0001_initial','auth' )
self.disk_migrations[app_config.label, migration_name] = migration_module.Migration(
migration_name,
app_config.label,
) # 更新初始化中的 disk_migrations
makemigrations 的操作类
这个类在 django.core.management.commands.makemigrations 中 第一个函数还是 add_arguments()这里添加参数的方法可以借鉴,以后写脚本需要参数,可以使用
点击查看代码
def add_arguments(self, parser):
parser.add_argument( # 可以指定 app
"args",
metavar="app_label",
nargs="*",
help="Specify the app label(s) to create migrations for.",
)
parser.add_argument( # 试运行迁移文件,但不会去创建迁移文件
"--dry-run",
action="store_true",
help="Just show what migrations would be made; don't actually write them.",
)
parser.add_argument( # 强制合并迁移文件的冲突
"--merge",
action="store_true",
help="Enable fixing of migration conflicts.",
)
parser.add_argument(
"--empty",
action="store_true",
help="Create an empty migration.",
)
parser.add_argument( #
"--noinput",
"--no-input",
action="store_false",
dest="interactive",
help="Tells Django to NOT prompt the user for input of any kind.",
)
parser.add_argument(
"-n",
"--name",
help="Use this name for migration file(s).",
)
parser.add_argument( # 取消往迁移文件中添加注释
"--no-header",
action="store_false",
dest="include_header",
help="Do not add header comments to new migration file(s).",
)
parser.add_argument(
"--check",
action="store_true",
dest="check_changes",
help="Exit with a non-zero status if model changes are missing migrations.",
)
parser.add_argument(
"--scriptable",
action="store_true",
dest="scriptable",
help=(
"Divert log output and input prompts to stderr, writing only "
"paths of generated migration files to stdout."
),
)
最重要的方法还是 handle 函数
点击查看代码
@no_translations
def handle(self, *app_labels, **options):
self.written_files = []
self.verbosity = options["verbosity"] # -v 打印详情
self.interactive = options["interactive"]
self.dry_run = options["dry_run"] # 不会生成迁移
self.merge = options["merge"] # 合并
self.empty = options["empty"]
self.migration_name = options["name"] # 自定义迁移名
if self.migration_name and not self.migration_name.isidentifier(): # 如果migration 文件名称不符合python规范,会报错
raise CommandError("The migration name must be a valid Python identifier.")
self.include_header = options["include_header"]
check_changes = options["check_changes"]
self.scriptable = options["scriptable"]
# If logs and prompts are diverted to stderr, remove the ERROR style.
if self.scriptable: # 如果日志和提示被转移到stderr,则删除ERROR样式。
self.stderr.style_func = None
# Make sure the app they asked for exists
app_labels = set(app_labels)
has_bad_labels = False
for app_label in app_labels:
try:
apps.get_app_config(app_label)
except LookupError as err:
self.stderr.write(str(err))
has_bad_labels = True
if has_bad_labels:
sys.exit(2)
# Load the current graph state. Pass in None for the connection so
# the loader doesn't try to resolve replaced migrations from DB.
loader = MigrationLoader(None, ignore_no_migrations=True) #
# Raise an error if any migrations are applied before their dependencies.
consistency_check_labels = {config.label for config in apps.get_app_configs()}
# Non-default databases are only checked if database routers used.
aliases_to_check = (
connections if settings.DATABASE_ROUTERS else [DEFAULT_DB_ALIAS]
)
for alias in sorted(aliases_to_check):
connection = connections[alias]
if connection.settings_dict["ENGINE"] != "django.db.backends.dummy" and any(
# At least one model must be migrated to the database.
router.allow_migrate(
connection.alias, app_label, model_name=model._meta.object_name
)
for app_label in consistency_check_labels
for model in apps.get_app_config(app_label).get_models()
):
try:
loader.check_consistent_history(connection)
except OperationalError as error:
warnings.warn(
"Got an error checking a consistent migration history "
"performed for database connection '%s': %s" % (alias, error),
RuntimeWarning,
)
# Before anything else, see if there's conflicting apps and drop out
# hard if there are any and they don't want to merge
conflicts = loader.detect_conflicts()
# If app_labels is specified, filter out conflicting migrations for
# unspecified apps.
if app_labels:
conflicts = {
app_label: conflict
for app_label, conflict in conflicts.items()
if app_label in app_labels
}
if conflicts and not self.merge:
name_str = "; ".join(
"%s in %s" % (", ".join(names), app) for app, names in conflicts.items()
)
raise CommandError(
"Conflicting migrations detected; multiple leaf nodes in the "
"migration graph: (%s).\nTo fix them run "
"'python manage.py makemigrations --merge'" % name_str
)
# If they want to merge and there's nothing to merge, then politely exit
if self.merge and not conflicts: # 如果没有冲突,但是需要合并的时候打印
self.log("No conflicts detected to merge.")
return
# If they want to merge and there is something to merge, then
# divert into the merge code
if self.merge and conflicts:
return self.handle_merge(loader, conflicts)
if self.interactive:
questioner = InteractiveMigrationQuestioner(
specified_apps=app_labels,
dry_run=self.dry_run,
prompt_output=self.log_output,
)
else:
questioner = NonInteractiveMigrationQuestioner(
specified_apps=app_labels,
dry_run=self.dry_run,
verbosity=self.verbosity,
log=self.log,
)
# Set up autodetector
autodetector = MigrationAutodetector(
loader.project_state(),
ProjectState.from_apps(apps),
questioner,
)
# If they want to make an empty migration, make one for each app
if self.empty:
if not app_labels:
raise CommandError(
"You must supply at least one app label when using --empty."
)
# Make a fake changes() result we can pass to arrange_for_graph
changes = {app: [Migration("custom", app)] for app in app_labels}
changes = autodetector.arrange_for_graph(
changes=changes,
graph=loader.graph,
migration_name=self.migration_name,
)
self.write_migration_files(changes)
return
# Detect changes
changes = autodetector.changes( # 一方面去取生成的迁移文件 {'app': [<Migration middle.0001.initial>]}
graph=loader.graph,
trim_to_apps=app_labels or None,
convert_apps=app_labels or None,
migration_name=self.migration_name,
)
if not changes:
# No changes? Tell them.
if self.verbosity >= 1:
if app_labels:
if len(app_labels) == 1:
self.log("No changes detected in app '%s'" % app_labels.pop())
else:
self.log(
"No changes detected in apps '%s'"
% ("', '".join(app_labels))
)
else:
self.log("No changes detected")
else:
# 写迁移文件
self.write_migration_files(changes)
if check_changes:
sys.exit(1)
点击查看代码
def write_migration_files(self, changes):
"""
Take a changes dict and write them out as migration files.
"""
directory_created = {}
for app_label, app_migrations in changes.items():
if self.verbosity >= 1:
self.log(self.style.MIGRATE_HEADING("Migrations for '%s':" % app_label))
for migration in app_migrations:
# Describe the migration
writer = MigrationWriter(migration, self.include_header)
if self.verbosity >= 1:
# Display a relative path if it's below the current working
# directory, or an absolute path otherwise.
try:
migration_string = os.path.relpath(writer.path)
except ValueError:
migration_string = writer.path
if migration_string.startswith(".."):
migration_string = writer.path
self.log(" %s\n" % self.style.MIGRATE_LABEL(migration_string))
for operation in migration.operations:
self.log(" - %s" % operation.describe()) # 打印创建表名称日志
if self.scriptable:
self.stdout.write(migration_string)
if not self.dry_run:
# Write the migrations file to the disk.
migrations_directory = os.path.dirname(writer.path)
if not directory_created.get(app_label):
os.makedirs(migrations_directory, exist_ok=True)
init_path = os.path.join(migrations_directory, "__init__.py")
if not os.path.isfile(init_path):
open(init_path, "w").close()
# We just do this once per app
directory_created[app_label] = True
migration_string = writer.as_string() # 迁移文件内容
with open(writer.path, "w", encoding="utf-8") as fh:
fh.write(migration_string)
self.written_files.append(writer.path)
elif self.verbosity == 3:
# Alternatively, makemigrations --dry-run --verbosity 3
# will log the migrations rather than saving the file to
# the disk.
self.log(
self.style.MIGRATE_HEADING(
"Full migrations file '%s':" % writer.filename
)
)
self.log(writer.as_string())
run_formatters(self.written_files)