chore(db): 比较于niushop的差异

This commit is contained in:
2025-11-29 17:51:17 +08:00
parent 1c9e72e28d
commit 3a7f510e19
4 changed files with 17348 additions and 0 deletions

View File

@@ -0,0 +1,375 @@
import re
import os
# 解析SQL文件提取表结构
def parse_sql_file(file_path, ignore_prefix=None):
tables = {}
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 匹配CREATE TABLE语句
table_pattern = re.compile(r'CREATE TABLE\s+`?([^`\s]+)`?\s*\(([^;]+)\)\s*(?:[^;]+);', re.DOTALL | re.IGNORECASE)
matches = table_pattern.findall(content)
for full_table_name, table_def in matches:
# 处理表名,忽略前缀
table_name = full_table_name
if ignore_prefix and table_name.startswith(ignore_prefix):
table_name = table_name[len(ignore_prefix):]
# 提取列定义
columns = []
# 匹配列定义,包括列名、类型、约束等
column_pattern = re.compile(r'\s*`?([^`\s,]+)`?\s+([^\s,]+)\s*([^,]+)(?:,|$)', re.DOTALL)
column_matches = column_pattern.findall(table_def)
for col_name, col_type, col_constraints in column_matches:
# 清理约束中的换行符和多余空格
col_constraints = re.sub(r'\s+', ' ', col_constraints.strip())
columns.append((col_name, col_type, col_constraints))
# 提取主键
primary_key = None
pk_pattern = re.compile(r'PRIMARY\s+KEY\s*\(`?([^`\s,]+)`?\)', re.IGNORECASE)
pk_match = pk_pattern.search(table_def)
if pk_match:
primary_key = pk_match.group(1)
# 提取索引
indexes = []
index_pattern = re.compile(r'INDEX\s+`?([^`\s]+)`?\s*\(([^)]+)\)', re.IGNORECASE)
index_matches = index_pattern.findall(table_def)
for index_name, index_cols in index_matches:
indexes.append((index_name, index_cols.strip()))
# 提取唯一约束
unique_constraints = []
unique_pattern = re.compile(r'UNIQUE\s+KEY\s*`?([^`\s]+)`?\s*\(([^)]+)\)', re.IGNORECASE)
unique_matches = unique_pattern.findall(table_def)
for unique_name, unique_cols in unique_matches:
unique_constraints.append((unique_name, unique_cols.strip()))
tables[table_name] = {
'full_name': full_table_name,
'columns': columns,
'primary_key': primary_key,
'indexes': indexes,
'unique_constraints': unique_constraints
}
return tables
# 比较两个数据库表结构
def compare_databases(db1, db2, db1_name, db2_name):
diffs = {
'only_in_db1': [],
'only_in_db2': [],
'column_diffs': {},
'primary_key_diffs': {},
'index_diffs': {},
'unique_constraint_diffs': {}
}
# 找出只在db1中存在的表
for table_name in db1:
if table_name not in db2:
diffs['only_in_db1'].append(table_name)
# 找出只在db2中存在的表
for table_name in db2:
if table_name not in db1:
diffs['only_in_db2'].append(table_name)
# 比较共同存在的表
common_tables = set(db1.keys()) & set(db2.keys())
for table_name in common_tables:
table1 = db1[table_name]
table2 = db2[table_name]
# 比较列
col_diffs = {
'only_in_db1': [],
'only_in_db2': [],
'type_diffs': [],
'constraint_diffs': []
}
# 列名集合
cols1 = {col[0]: col for col in table1['columns']}
cols2 = {col[0]: col for col in table2['columns']}
# 只在db1中的列
for col_name in cols1:
if col_name not in cols2:
col_diffs['only_in_db1'].append(col_name)
# 只在db2中的列
for col_name in cols2:
if col_name not in cols1:
col_diffs['only_in_db2'].append(col_name)
# 比较列类型和约束
common_cols = set(cols1.keys()) & set(cols2.keys())
for col_name in common_cols:
col1 = cols1[col_name]
col2 = cols2[col_name]
# 类型差异
if col1[1] != col2[1]:
col_diffs['type_diffs'].append((col_name, col1[1], col2[1]))
# 约束差异
if col1[2] != col2[2]:
col_diffs['constraint_diffs'].append((col_name, col1[2], col2[2]))
if any(col_diffs.values()):
diffs['column_diffs'][table_name] = col_diffs
# 比较主键
if table1['primary_key'] != table2['primary_key']:
diffs['primary_key_diffs'][table_name] = (table1['primary_key'], table2['primary_key'])
# 比较索引
index_diffs = {
'only_in_db1': [],
'only_in_db2': [],
'definition_diffs': []
}
indexes1 = {idx[0]: idx[1] for idx in table1['indexes']}
indexes2 = {idx[0]: idx[1] for idx in table2['indexes']}
# 只在db1中的索引
for idx_name in indexes1:
if idx_name not in indexes2:
index_diffs['only_in_db1'].append((idx_name, indexes1[idx_name]))
# 只在db2中的索引
for idx_name in indexes2:
if idx_name not in indexes1:
index_diffs['only_in_db2'].append((idx_name, indexes2[idx_name]))
# 比较索引定义
common_indexes = set(indexes1.keys()) & set(indexes2.keys())
for idx_name in common_indexes:
if indexes1[idx_name] != indexes2[idx_name]:
index_diffs['definition_diffs'].append((idx_name, indexes1[idx_name], indexes2[idx_name]))
if any(index_diffs.values()):
diffs['index_diffs'][table_name] = index_diffs
# 比较唯一约束
unique_diffs = {
'only_in_db1': [],
'only_in_db2': [],
'definition_diffs': []
}
unique1 = {uc[0]: uc[1] for uc in table1['unique_constraints']}
unique2 = {uc[0]: uc[1] for uc in table2['unique_constraints']}
# 只在db1中的唯一约束
for uc_name in unique1:
if uc_name not in unique2:
unique_diffs['only_in_db1'].append((uc_name, unique1[uc_name]))
# 只在db2中的唯一约束
for uc_name in unique2:
if uc_name not in unique1:
unique_diffs['only_in_db2'].append((uc_name, unique2[uc_name]))
# 比较唯一约束定义
common_unique = set(unique1.keys()) & set(unique2.keys())
for uc_name in common_unique:
if unique1[uc_name] != unique2[uc_name]:
unique_diffs['definition_diffs'].append((uc_name, unique1[uc_name], unique2[uc_name]))
if any(unique_diffs.values()):
diffs['unique_constraint_diffs'][table_name] = unique_diffs
return diffs
# 打印差异报告
# 生成Markdown格式的差异报告
def generate_markdown_report(diffs, db1_name, db2_name, db1_table_count, db2_table_count):
report = []
# 报告标题
report.append(f"# 数据库差异报告: {db1_name} vs {db2_name}")
report.append("\n## 1. 表数量统计")
report.append("| 数据库文件 | 表数量 |")
report.append("|------------|--------|")
report.append(f"| {db1_name} | {db1_table_count} |")
report.append(f"| {db2_name} | {db2_table_count} |")
# 表存在性差异
report.append("\n## 2. 表存在性差异")
# 仅在db1中的表
if diffs['only_in_db1']:
report.append(f"\n### 2.1 仅在 {db1_name} 中存在的表 ({len(diffs['only_in_db1'])} 个)")
report.append("| 表名 |")
report.append("|------|")
for table in sorted(diffs['only_in_db1']):
report.append(f"| {table} |")
# 仅在db2中的表
if diffs['only_in_db2']:
report.append(f"\n### 2.2 仅在 {db2_name} 中存在的表 ({len(diffs['only_in_db2'])} 个)")
report.append("| 表名 |")
report.append("|------|")
for table in sorted(diffs['only_in_db2']):
report.append(f"| {table} |")
# 列结构差异
if diffs['column_diffs']:
report.append(f"\n## 3. 列结构差异的表 ({len(diffs['column_diffs'])} 个)")
for table, col_diffs in diffs['column_diffs'].items():
report.append(f"\n### 3.1 表: {table}")
# 仅在db1中的列
if col_diffs['only_in_db1']:
report.append(f"\n#### 3.1.1 仅在 {db1_name} 中存在的列")
report.append("| 列名 |")
report.append("|------|")
for col in col_diffs['only_in_db1']:
report.append(f"| {col} |")
# 仅在db2中的列
if col_diffs['only_in_db2']:
report.append(f"\n#### 3.1.2 仅在 {db2_name} 中存在的列")
report.append("| 列名 |")
report.append("|------|")
for col in col_diffs['only_in_db2']:
report.append(f"| {col} |")
# 列类型差异
if col_diffs['type_diffs']:
report.append(f"\n#### 3.1.3 列类型差异")
report.append(f"| 列名 | {db1_name} | {db2_name} |")
report.append("|------|------------|------------|")
for col_name, type1, type2 in col_diffs['type_diffs']:
report.append(f"| {col_name} | {type1} | {type2} |")
# 列约束差异
if col_diffs['constraint_diffs']:
report.append(f"\n#### 3.1.4 列约束差异")
report.append(f"| 列名 | {db1_name} | {db2_name} |")
report.append("|------|------------|------------|")
for col_name, constraint1, constraint2 in col_diffs['constraint_diffs']:
report.append(f"| {col_name} | {constraint1} | {constraint2} |")
# 主键差异
if diffs['primary_key_diffs']:
report.append(f"\n## 4. 主键差异的表 ({len(diffs['primary_key_diffs'])} 个)")
report.append(f"| 表名 | {db1_name} | {db2_name} |")
report.append("|------|------------|------------|")
for table, (pk1, pk2) in diffs['primary_key_diffs'].items():
report.append(f"| {table} | {pk1} | {pk2} |")
# 索引差异
if diffs['index_diffs']:
report.append(f"\n## 5. 索引差异的表 ({len(diffs['index_diffs'])} 个)")
for table, idx_diffs in diffs['index_diffs'].items():
report.append(f"\n### 5.1 表: {table}")
# 仅在db1中的索引
if idx_diffs['only_in_db1']:
report.append(f"\n#### 5.1.1 仅在 {db1_name} 中存在的索引")
report.append("| 索引名 | 索引列 |")
report.append("|--------|--------|")
for idx_name, idx_cols in idx_diffs['only_in_db1']:
report.append(f"| {idx_name} | {idx_cols} |")
# 仅在db2中的索引
if idx_diffs['only_in_db2']:
report.append(f"\n#### 5.1.2 仅在 {db2_name} 中存在的索引")
report.append("| 索引名 | 索引列 |")
report.append("|--------|--------|")
for idx_name, idx_cols in idx_diffs['only_in_db2']:
report.append(f"| {idx_name} | {idx_cols} |")
# 索引定义差异
if idx_diffs['definition_diffs']:
report.append(f"\n#### 5.1.3 索引定义差异")
report.append(f"| 索引名 | {db1_name} | {db2_name} |")
report.append("|--------|------------|------------|")
for idx_name, idx1, idx2 in idx_diffs['definition_diffs']:
report.append(f"| {idx_name} | {idx1} | {idx2} |")
# 唯一约束差异
if diffs['unique_constraint_diffs']:
report.append(f"\n## 6. 唯一约束差异的表 ({len(diffs['unique_constraint_diffs'])} 个)")
for table, uc_diffs in diffs['unique_constraint_diffs'].items():
report.append(f"\n### 6.1 表: {table}")
# 仅在db1中的唯一约束
if uc_diffs['only_in_db1']:
report.append(f"\n#### 6.1.1 仅在 {db1_name} 中存在的唯一约束")
report.append("| 约束名 | 约束列 |")
report.append("|--------|--------|")
for uc_name, uc_cols in uc_diffs['only_in_db1']:
report.append(f"| {uc_name} | {uc_cols} |")
# 仅在db2中的唯一约束
if uc_diffs['only_in_db2']:
report.append(f"\n#### 6.1.2 仅在 {db2_name} 中存在的唯一约束")
report.append("| 约束名 | 约束列 |")
report.append("|--------|--------|")
for uc_name, uc_cols in uc_diffs['only_in_db2']:
report.append(f"| {uc_name} | {uc_cols} |")
# 唯一约束定义差异
if uc_diffs['definition_diffs']:
report.append(f"\n#### 6.1.3 唯一约束定义差异")
report.append(f"| 约束名 | {db1_name} | {db2_name} |")
report.append("|--------|------------|------------|")
for uc_name, uc1, uc2 in uc_diffs['definition_diffs']:
report.append(f"| {uc_name} | {uc1} | {uc2} |")
report.append("\n## 7. 总结")
report.append("差异比较完成!")
return '\n'.join(report)
# 主函数
def main():
# 文件路径
db1_path = r'D:\projects\shop-projects\backend\docs\db\niushop_database.sql'
db2_path = r'D:\projects\shop-projects\backend\docs\db\init_v2.0_with_data.sql'
report_path = r'D:\projects\shop-projects\backend\docs\db\database_diff_report.md'
# 解析数据库结构
print(f"正在解析 {db1_path}...")
db1 = parse_sql_file(db1_path)
db1_table_count = len(db1)
print(f"解析完成,共 {db1_table_count} 个表")
print(f"\n正在解析 {db2_path}...")
db2 = parse_sql_file(db2_path, ignore_prefix='lucky_')
db2_table_count = len(db2)
print(f"解析完成,共 {db2_table_count} 个表")
# 比较差异
print("\n正在比较数据库差异...")
diffs = compare_databases(db1, db2, 'niushop_database.sql', 'init_v2.0_with_data.sql')
# 生成Markdown差异报告
print("\n正在生成Markdown差异报告...")
report = generate_markdown_report(diffs, 'niushop_database.sql', 'init_v2.0_with_data.sql', db1_table_count, db2_table_count)
# 保存报告到文件
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report)
print(f"\n差异报告已生成: {report_path}")
print("差异比较完成!")
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

11578
docs/db/niushop_database.sql Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,117 @@
import re
import os
# 读取 SQL 文件内容
def read_sql_file(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
# 解析 database.sql提取表结构和注释
def parse_database_sql(sql_content):
tables = {}
# 匹配 CREATE TABLE 语句,更灵活的格式
# 匹配所有 CREATE TABLE 语句,不依赖于 ENGINE、CHARACTER SET 等子句的顺序
table_pattern = re.compile(r"CREATE TABLE\s+`?([^`\s]+)`?\s*\(([^;]+)\)\s*(?:[^;]+COMMENT\s*=\s*'([^']+)'[^;]*)?\s*;", re.DOTALL | re.IGNORECASE)
matches = table_pattern.findall(sql_content)
for table_name, table_def, table_comment in matches:
if not table_comment:
# 如果没有匹配到表注释,尝试从其他位置获取
comment_match = re.search(r"COMMENT\s*=\s*'([^']+)'", table_def, re.IGNORECASE)
if comment_match:
table_comment = comment_match.group(1)
else:
table_comment = ''
# 解析列定义和注释
columns = {}
# 匹配列定义,包括 COMMENT
column_lines = table_def.split('\n')
for line in column_lines:
# 匹配列名、类型和注释
column_match = re.search(r"\s*([^\s,]+)\s+([^\s,]+)\s*(?:[^,]+COMMENT\s*=\s*'([^']+)'[^,]*|[^,]*)", line)
if column_match:
column_name = column_match.group(1)
column_comment = column_match.group(3) or ''
if column_comment:
columns[column_name] = column_comment
tables[table_name] = {
'comment': table_comment,
'columns': columns
}
return tables
# 更新 init_v2.0.sql 文件中的注释
def update_init_sql(init_sql_path, database_tables):
# 读取 init_v2.0.sql 内容
init_content = read_sql_file(init_sql_path)
# 匹配 CREATE TABLE 语句,适应 init_v2.0.sql 的格式
table_pattern = re.compile(r"(create table if not exists lucky_([^\s]+)\s*\(([^;]+)\)\s*comment\s*=\s*'[^']*'\s*(.*?);)", re.DOTALL | re.IGNORECASE)
def replace_table(match):
full_match = match.group(0)
table_name = match.group(2)
table_def = match.group(3)
table_suffix = match.group(4)
if table_name in database_tables:
# 获取数据库表的注释和列注释
db_table = database_tables[table_name]
table_comment = db_table['comment']
columns = db_table['columns']
# 更新列注释
new_table_def = table_def
for column_name, column_comment in columns.items():
# 匹配列定义,替换注释
# 格式:列名 类型 default 默认值 not null comment '注释'
column_pattern = re.compile(r"(\s*" + column_name + r"\s+[^\s,]+\s*(?:default\s+[^\s,]+\s*)?(?:not null\s*)?comment\s*=\s*')([^']*)'([^,]*)", re.IGNORECASE)
new_table_def = column_pattern.sub(r"\1" + column_comment + r"'\3", new_table_def)
# 重新构建 CREATE TABLE 语句
new_full_match = f"create table if not exists lucky_{table_name} ({new_table_def}) comment = '{table_comment}' {table_suffix};"
return new_full_match
return full_match
# 替换所有表
updated_content = table_pattern.sub(replace_table, init_content)
# 写回文件
with open(init_sql_path, 'w', encoding='utf-8') as f:
f.write(updated_content)
print(f"Updated {init_sql_path}")
# 主函数
def main():
# 文件路径
database_sql_path = r'./niushop_database.sql'
init_v20_sql_path = r'./init_v2.0.sql'
init_v20_with_data_sql_path = r'./init_v2.0_with_data.sql'
# 解析 database.sql
print("Parsing database.sql...")
database_content = read_sql_file(database_sql_path)
database_tables = parse_database_sql(database_content)
print(f"Found {len(database_tables)} tables in database.sql")
# 更新 init_v2.0.sql
if os.path.exists(init_v20_sql_path):
print("Updating init_v2.0.sql...")
update_init_sql(init_v20_sql_path, database_tables)
# 更新 init_v2.0_with_data.sql
if os.path.exists(init_v20_with_data_sql_path):
print("Updating init_v2.0_with_data.sql...")
update_init_sql(init_v20_with_data_sql_path, database_tables)
print("All files updated successfully!")
if __name__ == "__main__":
main()