InitMonthSqlFunction.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import os
  2. import shutil
  3. from datetime import datetime, timedelta
  4. def print_sql_files():
  5. """返回 Origin 目录下所有 sql 文件的内容字典
  6. Returns:
  7. dict: 字典,key 是文件名,value 是文件内容
  8. """
  9. # 获取当前文件所在目录
  10. current_dir = os.path.dirname(os.path.abspath(__file__))
  11. # 构建 Origin 目录路径
  12. origin_dir = os.path.join(current_dir, "Origin")
  13. # 创建结果字典
  14. sql_files_dict = {}
  15. # 遍历 Origin 目录
  16. for file_name in os.listdir(origin_dir):
  17. # 检查是否为 .sql 文件
  18. if file_name.endswith(".sql"):
  19. # 构建完整路径
  20. full_path = os.path.join(origin_dir, file_name)
  21. # 读取文件内容
  22. try:
  23. with open(full_path, 'r', encoding='utf-8') as f:
  24. content = f.read()
  25. sql_files_dict[file_name] = content
  26. except Exception as e:
  27. print(f"读取文件 {file_name} 时出错: {e}")
  28. return sql_files_dict
  29. def get_month_data():
  30. """获取并返回当前月份和次月等数据
  31. 返回格式:
  32. ["202602", "202603", "202601", "2026-02"]
  33. 分别对应:当前月份、次月、上月、当前月份(带分隔符)
  34. """
  35. # 获取当前日期
  36. now = datetime.now()
  37. # 当前月份 (格式: YYYYMM)
  38. current_month = now.strftime("%Y%m")
  39. # 次月
  40. next_month = (now.replace(day=28) + timedelta(days=4)).strftime("%Y%m")
  41. # 上月
  42. last_month = (now.replace(day=1) - timedelta(days=1)).strftime("%Y%m")
  43. # 当前月份 (带分隔符,格式: YYYY-MM)
  44. current_month_with_separator = now.strftime("%Y-%m")
  45. return [current_month, next_month, last_month, current_month_with_separator]
  46. def ensure_directory(directory_path):
  47. """判断指定目录是否存在,不存在则创建,存在则删除里面的内容
  48. Args:
  49. directory_path: 要检查的目录路径
  50. """
  51. print(f"处理目录: {directory_path}")
  52. # 检查目录是否存在
  53. if not os.path.exists(directory_path):
  54. # 目录不存在,创建目录
  55. os.makedirs(directory_path)
  56. print(f"目录不存在,已创建: {directory_path}")
  57. else:
  58. # 目录存在,删除里面的内容
  59. print(f"目录存在,删除里面的内容")
  60. for item in os.listdir(directory_path):
  61. item_path = os.path.join(directory_path, item)
  62. if os.path.isfile(item_path):
  63. os.remove(item_path)
  64. print(f"删除文件: {item_path}")
  65. elif os.path.isdir(item_path):
  66. shutil.rmtree(item_path)
  67. print(f"删除目录: {item_path}")
  68. def generate_sql_files():
  69. """生成SQL文件
  70. 步骤:
  71. 1. 调用 ensure_directory 确保 SqlOut 目录存在且为空
  72. 2. 调用 get_month_data 读取参数列表
  73. 3. 调用 print_sql_files 获得文件键值对
  74. 4. 遍历步骤3的返回值,每条把结果按照步骤2的返回值进行string替换其中{0}{1}{2}{3}的值
  75. 5. 按照key值输出到步骤1所说的目录里
  76. """
  77. # 步骤1: 确保 SqlOut 目录存在且为空
  78. current_dir = os.path.dirname(os.path.abspath(__file__))
  79. sql_out_dir = os.path.join(current_dir, "SqlOut")
  80. ensure_directory(sql_out_dir)
  81. # 步骤2: 读取参数列表
  82. month_data = get_month_data()
  83. print(f"\n获取到的月份参数: {month_data}")
  84. # 步骤3: 获得文件键值对
  85. sql_files = print_sql_files()
  86. print(f"\n获取到 {len(sql_files)} 个 SQL 文件")
  87. # 步骤4-5: 处理并输出文件
  88. print("\n开始处理并输出文件:")
  89. for file_name, content in sql_files.items():
  90. # 替换内容中的 {0}{1}{2}{3}
  91. try:
  92. # 先处理文件中可能存在的单独大括号,将它们转义
  93. # 然后使用format方法进行格式化
  94. # 注意:需要先将文件中的单独大括号转义为双大括号
  95. # 这样format方法就不会将它们视为占位符
  96. # 但要注意不要影响现有的{0}{1}{2}{3}占位符
  97. # 先保存现有的占位符
  98. temp_content = content
  99. placeholders = {}
  100. for i in range(4):
  101. placeholder = f"{{{i}}}"
  102. temp_content = temp_content.replace(placeholder, f"__PLACEHOLDER_{i}__")
  103. # 转义单独的大括号
  104. temp_content = temp_content.replace("{", "{{").replace("}", "}}")
  105. # 恢复占位符
  106. for i in range(4):
  107. temp_content = temp_content.replace(f"__PLACEHOLDER_{i}__", f"{{{i}}}")
  108. # 使用format方法进行格式化
  109. replaced_content = temp_content.format(*month_data)
  110. # 构建输出文件路径
  111. output_path = os.path.join(sql_out_dir, file_name)
  112. # 写入文件
  113. with open(output_path, 'w', encoding='utf-8') as f:
  114. f.write(replaced_content)
  115. print(f"成功输出文件: {file_name}")
  116. except Exception as e:
  117. print(f"处理文件 {file_name} 时出错: {e}")
  118. if __name__ == "__main__":
  119. generate_sql_files()