| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- import os
- import shutil
- from datetime import datetime, timedelta
- def print_sql_files():
- """返回 Origin 目录下所有 sql 文件的内容字典
-
- Returns:
- dict: 字典,key 是文件名,value 是文件内容
- """
- # 获取当前文件所在目录
- current_dir = os.path.dirname(os.path.abspath(__file__))
- # 构建 Origin 目录路径
- origin_dir = os.path.join(current_dir, "Origin")
-
- # 创建结果字典
- sql_files_dict = {}
-
- # 遍历 Origin 目录
- for file_name in os.listdir(origin_dir):
- # 检查是否为 .sql 文件
- if file_name.endswith(".sql"):
- # 构建完整路径
- full_path = os.path.join(origin_dir, file_name)
- # 读取文件内容
- try:
- with open(full_path, 'r', encoding='utf-8') as f:
- content = f.read()
- sql_files_dict[file_name] = content
- except Exception as e:
- print(f"读取文件 {file_name} 时出错: {e}")
-
- return sql_files_dict
- def get_month_data(month=None):
- """获取并返回指定月份或当前月份的相关数据
-
- Args:
- month: 可选参数,格式为 YYYYMM (如 "202602"),如果不传则使用当前月份
-
- 返回格式:
- ["202602", "202603", "202601", "2026-02"]
- 分别对应:指定月份、次月、上月、指定月份(带分隔符)
- """
- # 获取日期
- if month:
- # 如果传入了月份,解析为日期
- # 解析 YYYYMM 格式的月份
- year = int(month[:4])
- month_num = int(month[4:])
- now = datetime(year, month_num, 1)
- else:
- # 如果没有传入月份,使用当前日期
- now = datetime.now()
-
- # 当前月份 (格式: YYYYMM)
- current_month = now.strftime("%Y%m")
-
- # 次月
- next_month = (now.replace(day=28) + timedelta(days=4)).strftime("%Y%m")
-
- # 上月
- last_month = (now.replace(day=1) - timedelta(days=1)).strftime("%Y%m")
-
- # 当前月份 (带分隔符,格式: YYYY-MM)
- current_month_with_separator = now.strftime("%Y-%m")
-
- return [current_month, next_month, last_month, current_month_with_separator]
- def ensure_directory(directory_path):
- """判断指定目录是否存在,不存在则创建,存在则删除里面的内容
-
- Args:
- directory_path: 要检查的目录路径
- """
- print(f"处理目录: {directory_path}")
-
- # 检查目录是否存在
- if not os.path.exists(directory_path):
- # 目录不存在,创建目录
- os.makedirs(directory_path)
- print(f"目录不存在,已创建: {directory_path}")
- else:
- # 目录存在,删除里面的内容
- print(f"目录存在,删除里面的内容")
- for item in os.listdir(directory_path):
- item_path = os.path.join(directory_path, item)
- if os.path.isfile(item_path):
- os.remove(item_path)
- print(f"删除文件: {item_path}")
- elif os.path.isdir(item_path):
- shutil.rmtree(item_path)
- print(f"删除目录: {item_path}")
- def generate_sql_files(month=None):
- """生成SQL文件
-
- Args:
- month: 可选参数,格式为 YYYYMM (如 "202602"),如果不传则使用当前月份
-
- 步骤:
- 1. 调用 ensure_directory 确保 SqlOut 目录存在且为空
- 2. 调用 get_month_data 读取参数列表
- 3. 调用 print_sql_files 获得文件键值对
- 4. 遍历步骤3的返回值,每条把结果按照步骤2的返回值进行string替换其中{0}{1}{2}{3}的值
- 5. 按照key值输出到步骤1所说的目录里
- """
- # 对传入的月份参数进行格式校验
- if month:
- # 检查长度是否为6位
- if len(month) != 6:
- raise ValueError(f"月份参数格式错误: {month},应为6位数字,格式为 YYYYMM")
- # 检查是否为数字
- if not month.isdigit():
- raise ValueError(f"月份参数格式错误: {month},应为数字")
- # 检查月份是否在有效范围内 (1-12)
- month_num = int(month[4:])
- if month_num < 1 or month_num > 12:
- raise ValueError(f"月份参数格式错误: {month},月份应在 1-12 之间")
-
- # 步骤1: 确保 SqlOut 目录存在且为空
- current_dir = os.path.dirname(os.path.abspath(__file__))
- sql_out_dir = os.path.join(current_dir, "SqlOut")
- ensure_directory(sql_out_dir)
-
- # 步骤2: 读取参数列表
- month_data = get_month_data(month)
- print(f"\n获取到的月份参数: {month_data}")
-
- # 步骤3: 获得文件键值对
- sql_files = print_sql_files()
- print(f"\n获取到 {len(sql_files)} 个 SQL 文件")
-
- # 步骤4-5: 处理并输出文件
- print("\n开始处理并输出文件:")
- for file_name, content in sql_files.items():
- # 替换内容中的 {0}{1}{2}{3}
- try:
- # 使用 str.format 方法替换占位符
- replaced_content = content.format(*month_data)
-
- # 构建输出文件路径
- output_path = os.path.join(sql_out_dir, file_name)
-
- # 写入文件
- with open(output_path, 'w', encoding='utf-8') as f:
- f.write(replaced_content)
-
- print(f"成功输出文件: {file_name}")
- except Exception as e:
- print(f"处理文件 {file_name} 时出错: {e}")
- import sys
- if __name__ == "__main__":
- # 检查命令行参数
- if len(sys.argv) > 1:
- # 获取传入的月份参数
- month = sys.argv[1]
- print(f"使用传入的月份参数: {month}")
- generate_sql_files(month)
- else:
- # 没有传入参数,使用当前月份
- print("没有传入月份参数,使用当前月份")
- generate_sql_files()
|