C#实现Hive建表语句转Snowflake建表语句

发布于:2025-09-13 ⋅ 阅读:(13) ⋅ 点赞:(0)
// cd ./SqlScriptConverter/
// dotnet add package SqlParserCS
// dotnet add package Microsoft.Extensions.ObjectPool --version 9.0.3

using System.Text;
using SqlParser;
using SqlParser.Ast;
using SqlParser.Dialects;

public class StageConfig
{
    public string Name { get; set; }
    public bool Replace { get; set; }
    public bool IfNotExists { get; set; }
    public string StorageIntegration { get; set; }
    public string Url { get; set; }
    public string Credentials { get; set; }
    public FileFormatOptions FileFormat { get; set; }
    public Dictionary<string, string> CopyOptions { get; set; }
    public string Comment { get; set; }
}

public class FileFormatOptions
{
    public string FormatName { get; set; }
    public string Type { get; set; }
    public Dictionary<string, string> FormatOptions { get; set; } = new Dictionary<string, string>();
}

public class StageSqlGenerator
{
    public static string GenerateCreateStageSql(StageConfig config)
    {
        var replaceModifier = config.Replace ? "OR REPLACE " : "";
        var ifNotExistsModifier = config.IfNotExists ? "IF NOT EXISTS " : "";
        var clauses = new List<string>();

        AddStorageIntegration(config, clauses);
        AddUrl(config, clauses);
        AddCredentials(config, clauses);
        AddFileFormat(config, clauses);
        AddCopyOptions(config, clauses);
        AddComment(config, clauses);

        return FormatFinalSql(config, replaceModifier, ifNotExistsModifier, clauses);
    }

    private static void AddStorageIntegration(StageConfig config, List<string> clauses)
    {
        if (!string.IsNullOrEmpty(config.StorageIntegration))
        {
            clauses.Add($"STORAGE_INTEGRATION = {config.StorageIntegration}");
        }
    }

    private static void AddUrl(StageConfig config, List<string> clauses)
    {
        if (!string.IsNullOrEmpty(config.Url))
        {
            clauses.Add($"URL = '{EscapeSqlString(config.Url)}'");
        }
    }

    private static void AddCredentials(StageConfig config, List<string> clauses)
    {
        if (!string.IsNullOrEmpty(config.Credentials))
        {
            clauses.Add($"CREDENTIALS = ( {config.Credentials} )");
        }
    }

    private static void AddFileFormat(StageConfig config, List<string> clauses)
    {
        if (config.FileFormat == null) return;

        var ff = config.FileFormat;
        if (!string.IsNullOrEmpty(ff.FormatName))
        {
            clauses.Add($"FILE_FORMAT = ( FORMAT_NAME = '{EscapeSqlString(ff.FormatName)}' )");
        }
        else if (!string.IsNullOrEmpty(ff.Type))
        {
            var typeClause = $"TYPE = {ff.Type}";
            var options = ff.FormatOptions.Select(kv => $"{kv.Key} = {kv.Value}");
            clauses.Add(options.Any()
                ? $"FILE_FORMAT = ( {typeClause}, {string.Join(", ", options)} )"
                : $"FILE_FORMAT = ( {typeClause} )");
        }
    }

    private static void AddCopyOptions(StageConfig config, List<string> clauses)
    {
        if (config.CopyOptions?.Any() != true) return;
        
        var copyOptions = config.CopyOptions
            .Select(kv => $"{kv.Key} = {kv.Value}");
        clauses.Add($"COPY_OPTIONS = ( {string.Join(", ", copyOptions)} )");
    }

    private static void AddComment(StageConfig config, List<string> clauses)
    {
        if (!string.IsNullOrEmpty(config.Comment))
        {
            clauses.Add($"COMMENT = '{EscapeSqlString(config.Comment)}'");
        }
    }

    private static string FormatFinalSql(StageConfig config, string replaceModifier, 
        string ifNotExistsModifier, List<string> clauses)
    {
        return $"CREATE {replaceModifier}STAGE {ifNotExistsModifier}{config.Name}" +
               (clauses.Any() 
                   ? $"\n  {string.Join("\n  ", clauses)};\n" 
                   : ";\n");
    }

    private static string EscapeSqlString(string input)
    {
        return input?.Replace("'", "''") ?? string.Empty;
    }
}

public class HiveQLParser
{
    public static bool ParseCreateTableStatement(string input, out bool isExternal, out bool isTemporary)
    {
        isExternal = false;
        isTemporary = false;
        int index = 0;

        if (!MatchWhitespaceAndKeyword(ref input, ref index, "CREATE")) return false;
        if (!MatchOptionalClause(ref input, ref index, ref isExternal, ref isTemporary)) return false;
        if (!MatchWhitespaceAndKeyword(ref input, ref index, "TABLE")) return false;

        return index < input.Length && IsWhitespace(input[index]);
    }

    private static bool MatchOptionalClause(ref string input, ref int index, ref bool isExternal, ref bool isTemporary)
    {
        int savedIndex = index;
        
        if (MatchWhitespaceAndKeyword(ref input, ref index, "EXTERNAL"))
        {
            isExternal = true;
            return true;
        }
        
        index = savedIndex;
        if (MatchWhitespaceAndKeyword(ref input, ref index, "TEMPORARY"))
        {
            isTemporary = true;
            return true;
        }

        index = savedIndex;
        return true;
    }

    private static bool MatchWhitespaceAndKeyword(ref string input, ref int index, string keyword)
    {
        while (index < input.Length && IsWhitespace(input[index])) index++;
        return MatchKeyword(ref input, ref index, keyword);
    }

    private static bool MatchKeyword(ref string input, ref int index, string keyword)
    {
        if (index + keyword.Length > input.Length) return false;

        for (int i = 0; i < keyword.Length; i++)
        {
            if (char.ToUpperInvariant(input[index + i]) != char.ToUpperInvariant(keyword[i]))
                return false;
        }

        index += keyword.Length;
        return true;
    }

    private static bool IsWhitespace(char c) => c == ' ' || c == '\t' || c == '\n' || c == '\r';
}

public class StringProcessor
{
    public string ProcessString(string input)
    {
        input = ReplaceCommaSpaces(input);
        var outerPairs = FindOuterParenthesesPairs(input);
        List<int> insertPositions = new List<int>();

        foreach (var (start, end) in outerPairs)
        {
            if (start > end)
                continue;

            string substr = input.Substring(start, end - start + 1);
            var groups = SplitIntoGroups(substr);

            foreach (var group in groups)
            {
                int groupStartInSubstr = group.Start;
                int groupEndInSubstr = group.End;

                bool hasLT = false;
                bool hasGT = false;
                for (int i = groupStartInSubstr; i <= groupEndInSubstr; i++)
                {
                    char c = substr[i];
                    if (c == '<') hasLT = true;
                    else if (c == '>') hasGT = true;
                    if (hasLT && hasGT) break;
                }

                if (hasLT && hasGT)
                {
                    int originalStart = start + groupStartInSubstr;
                    int originalEnd = start + groupEndInSubstr;
                    insertPositions.Add(originalStart);
                    insertPositions.Add(originalEnd + 1);
                }
            }
        }

        insertPositions.Sort((a, b) => b.CompareTo(a));
        StringBuilder sb = new StringBuilder(input);
        foreach (int pos in insertPositions)
        {
            sb.Insert(pos, '`');
        }
        return sb.ToString();
    }

    private List<(int Start, int End)> FindOuterParenthesesPairs(string input)
    {
        List<(int, int)> pairs = new List<(int, int)>();
        int depth = 0;
        int currentStart = -1;

        for (int i = 0; i < input.Length; i++)
        {
            char c = input[i];
            if (c == '(')
            {
                depth++;
                if (depth == 1)
                {
                    currentStart = i + 1;
                }
            }
            else if (c == ')')
            {
                depth--;
                if (depth == 0 && currentStart != -1)
                {
                    int currentEnd = i - 1;
                    if (currentStart <= currentEnd)
                    {
                        pairs.Add((currentStart, currentEnd));
                    }
                    currentStart = -1;
                }
            }
        }
        return pairs;
    }

    private List<(int Start, int End)> SplitIntoGroups(string substr)
    {
        List<(int, int)> groups = new List<(int, int)>();
        int currentGroupStart = -1;

        for (int i = 0; i < substr.Length; i++)
        {
            if (IsWhitespace(substr[i]))
            {
                if (currentGroupStart != -1)
                {
                    groups.Add((currentGroupStart, i - 1));
                    currentGroupStart = -1;
                }
            }
            else
            {
                if (currentGroupStart == -1)
                {
                    currentGroupStart = i;
                }
            }
        }

        if (currentGroupStart != -1)
        {
            groups.Add((currentGroupStart, substr.Length - 1));
        }

        return groups;
    }

    private bool IsWhitespace(char c)
    {
        return c == ' ' || c == '\t' || c == '\n' || c == '\r';
    }

    private string ReplaceCommaSpaces(string input)
    {
        bool changed;
        do
        {
            string newStr = input.Replace(", ", ",").Replace(" ,", ",").Replace(" :", ":").Replace(": ", ":");
            changed = newStr != input;
            input = newStr;
        } while (changed);
    
        return input.Replace(">,", "> ,");
    }
}

public class HiveToSnowflakeConverter
{
    public bool external_table_auto_refresh {get; set;}
    public string external_table_bucket_prefix {get; set;}
    public string external_table_aws_sns_topic {get; set;}
    public string external_table_aws_key_id {get; set;}
    public string external_table_aws_secret_key {get; set;}
    public string external_table_field_delimiter {get; set;}
    public int external_table_field_skip_header {get; set;}

    private readonly Dictionary<string, string> DataTypeMapping = new()
    {
        { "TinyInt", "TINYINT" },
        { "SmallInt", "SMALLINT" },
        { "Int", "INT" },
        { "BigInt", "BIGINT" },
        { "Float", "FLOAT" }, 
        { "Double", "FLOAT" },
        { "Decimal", "DECIMAL" },
        { "Boolean", "BOOLEAN" },
        { "StringType", "VARCHAR" },
        { "Char", "CHAR" },
        { "Binary", "BINARY" },
        { "Timestamp", "TIMESTAMP" },
        { "Date", "DATE" },
        { "ARRAY", "ARRAY" }, 
        { "MAP", "VARCHAR" }, 
        { "STRUCT", "OBJECT" }, 
        { "UNIONTYPE", "STRING" }, 
        { "INTERVAL", "STRING" }, 
        { "TIMESTAMP WITH LOCAL TIME ZONE", "VARCHAR" } 
    };

    public string CreateTable(string hiveSql, HiveDialect dialect)
    {
        bool actualExternal, actualTemporary;
        bool result = HiveQLParser.ParseCreateTableStatement(
            hiveSql,
            out actualExternal,
            out actualTemporary
        );
        if(!result)
        {
            return "";
        }
        SqlQueryParser parser = new SqlQueryParser();
        var statements = parser.Parse(hiveSql, dialect);

        if (statements.Count == 0 || statements.First() is not Statement.CreateTable createTable)
        {
            throw new ArgumentException("Not a valid Hive CREATE TABLE statement");
        }

        var hiveTableInfo = ParseHiveCreateTable(createTable);
        return GenerateSnowflakeCreateTable(hiveTableInfo, actualExternal, actualTemporary);
    }

    private HiveTableInfo ParseHiveCreateTable(Statement.CreateTable createTable)
    {
        var body = createTable.Element;
        var HiveDistribution = body.HiveDistribution as HiveDistributionStyle.Partitioned;
        var columns = HiveDistribution == null? null: HiveDistribution.Columns;
        var info = new HiveTableInfo
        {
            TableName = body.Name.ToString().Replace("`" ,"\""),
            OrReplace = body.OrReplace,
            IfNotExists = body.IfNotExists,
            TableComment = body.Comment?.Comment.ToString()??null,
            PartitionColumns = columns == null ? new List<string>() : columns.Select(p => p.Name.ToString()).ToList(),
            ClusteredColumns = body.ClusteredBy?.Columns.Select(p => p.ToString()).ToList() ?? new(),
            StorageFormat = (body.HiveFormats == null || body.HiveFormats.Storage == null)?"":(body.HiveFormats.Storage as HiveIOFormat.FileFormat).Format.ToString(),
            Location = (body.HiveFormats == null || body.HiveFormats.Location == null)?"":body.HiveFormats.Location.ToString(),
            Columns = new List<ColumnInfo>()
        };

        foreach (var column in body.Columns)
        {
            string DataType = column.DataType.ToString().Split(" ")[0];
            if(DataType == "Custom")
            {
                DataType = column.DataType.ToString().Split("`")[1];
            }
            else if(DataType == "Decimal")
            {
                var ps = (column.DataType as DataType.Decimal).ExactNumberInfo as ExactNumberInfo.PrecisionAndScale;
                DataType = $"DECIMAL({ps?.Length}, {ps?.Scale})";
            }
            info.Columns.Add(new ColumnInfo
            {
                Name = column.Name.ToString().Replace("`" ,"\""),
                DataType = DataType.StartsWith("DECIMAL") ? DataType : MapHiveToSnowflakeType(DataType),
                Comment = column.Options==null?"":column.Options[0].Option.ToSql()
            });
        }
        if (columns != null)
        {
            columns.ForEach(
                Column => {
                    info.Columns.Add(new ColumnInfo
                    {
                        Name = Column.Name.ToString().Replace("`", "\""),
                        DataType = MapHiveToSnowflakeType(Column.DataType.ToString()),
                        Comment = ""
                    });
                }
            );
        }

        return info;
    }

    private string MapHiveToSnowflakeType(string hiveType)
    {
        if (DataTypeMapping.TryGetValue(hiveType, out var snowflakeType))
        {
            return snowflakeType;
        }

        if (hiveType.StartsWith("ARRAY<") || hiveType.StartsWith("MAP<"))
        {
            return DataTypeMapping[hiveType.Split('<')[0]]; 
        }

        return "VARCHAR";
    }

    private string GenerateSnowflakeCreateTable(HiveTableInfo info, bool actualExternal, bool actualTemporary)
    {
        var columns = info.Columns.Select(col => $"    {col.Name} {col.DataType} {col.Comment ?? ""}").ToList();
        string partition = actualExternal ? "PARTITION" : "CLUSTERED";
        var partitionColumns = info.PartitionColumns.Any() 
            ? $"\n{partition} BY ({string.Join(", ", info.PartitionColumns.Select(p => p))})"
            : "";
        string format = info.StorageFormat=="TextFile"?"CSV":info.StorageFormat;
        string tableName = info.TableName.Contains(" ") ? info.TableName : info.TableName.Trim('\"');
        string stage = info.TableName.Trim('\"').Replace (" ", "_") + "_stage";
        string location = actualExternal ? @$"
LOCATION=@{stage}
AUTO_REFRESH = {(external_table_auto_refresh ? "TRUE" : "FALSE")}
FILE_FORMAT = (TYPE = {format})
AWS_SNS_TOPIC = '{external_table_aws_sns_topic}';
" : "";
        var clusteredColumns = info.ClusteredColumns.Any() 
            ? $"\nCLUSTERED BY ({string.Join(", ", info.ClusteredColumns.Select(p => p))})"
            : "";
        var tableComment = info.TableComment != null 
            ? $"\nCOMMENT = '{info.TableComment}'" 
            : "";
        string external = actualExternal?"EXTERNAL ":"";
        string temporary = actualTemporary?"TEMPORARY ":"";
        string sql = "";
        if(actualExternal)
        {
            var config = new StageConfig
            {
                Name = stage,
                Replace = true,
                Url = external_table_bucket_prefix + info.Location,
                Credentials = $"AWS_KEY_ID='{external_table_aws_key_id}' AWS_SECRET_KEY='{external_table_aws_secret_key}'",
                FileFormat = new FileFormatOptions
                {
                    Type = format,
                    FormatOptions = new Dictionary<string, string>
                    {
                        { "FIELD_DELIMITER", $"'{external_table_field_delimiter}'" },
                        { "SKIP_HEADER", $"{external_table_field_skip_header}" }
                    }
                },
                CopyOptions = new Dictionary<string, string>
                {
                    { "ON_ERROR", "'CONTINUE'" }
                },
                Comment = $"stage for {tableName}"
            };
            sql = StageSqlGenerator.GenerateCreateStageSql(config);
}
        string OrReplace = info.OrReplace ? "OR REPLACE " : "";
        string IfNotExists = info.IfNotExists ? "IF NOT EXISTS " : "";
        sql += $@"
CREATE {OrReplace}{external}{temporary}TABLE {IfNotExists}{tableName} (
{string.Join(",\n", columns).TrimEnd('\n').TrimEnd(',')}
){tableComment}{partitionColumns}{location}{clusteredColumns};
".Replace(" ,", ",");
        return sql;
    }
}

public class HiveTableInfo
{
    public string TableName { get; set; }
    public bool OrReplace { get; set; }
    public bool IfNotExists { get; set; }
    public List<ColumnInfo> Columns { get; set; }
    public string TableComment { get; set; }
    public List<string> PartitionColumns { get; set; }
    public List<string> ClusteredColumns { get; set; }
    public string StorageFormat { get; set; }
    public string Location { get; set; }
}

public class ColumnInfo
{
    public string Name { get; set; }
    public string DataType { get; set; }
    public string Comment { get; set; }
}

class Program
{
    public static void ParseHiveQL(string hiveSql)
    {
        bool continue_loop = true;
        foreach (char ch in hiveSql)
        {
            if (char.IsWhiteSpace(ch))
            {
                continue;
            }

            switch (char.ToLower(ch))
            {
                case 'a':
                    a(hiveSql);
                    continue_loop = false;
                    break;
                case 'b':
                    b(hiveSql);
                    continue_loop = false;
                    break;
                case 'c':
                    c(hiveSql);
                    continue_loop = false;
                    break;
                default:
                    d(hiveSql);
                    continue_loop = false;
                    break;
            }
            if(!continue_loop)
            {
                break;
            }
        }
    }

    private static void a(string hiveSql)
    {
        Console.WriteLine(" a");
    }

    private static void b(string hiveSql)
    {
        Console.WriteLine(" b");
    }

    private static void c(string hiveSql)
    {
        HiveDialect dialect = new HiveDialect();
        StringProcessor processor = new StringProcessor();
        hiveSql = processor.ProcessString(hiveSql);
        try
        {
            HiveToSnowflakeConverter converter = new HiveToSnowflakeConverter();	
            converter.external_table_auto_refresh = true;
            converter.external_table_bucket_prefix = "s3://s3_mybucket";
            converter.external_table_aws_sns_topic = "arn:aws:sns:us-west-2:001234567890:s3_mybucket";	
            converter.external_table_aws_key_id = "AKIAXX";
            converter.external_table_aws_secret_key = "xxx";             
            converter.external_table_field_delimiter = ","; 
            converter.external_table_field_skip_header = 1;
            string snowflakeSql = converter.CreateTable(hiveSql, dialect);	
            Console.WriteLine(snowflakeSql);
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error: {ex.Message}");
        }
    }

    private static void d(string hiveSql)
    {
        Console.WriteLine(" d");
    }

    static void Main(string[] args)
    {
        string hiveSql = @"
            CREATE TABLE IF NOT EXISTS hive_all_data_types_table (
    -- 基本数据类型
    tinyint_col      TINYINT,
    smallint_col     SMALLINT,
    int_col          INT,
    bigint_col       BIGINT,
    float_col        FLOAT,
    double_col       DOUBLE,
    decimal_col      DECIMAL(10, 2),  -- 需指定精度和小数位
    timestamp_col    TIMESTAMP,
    date_col         DATE,
    string_col       STRING,
    varchar_col      VARCHAR(255),    -- 需指定最大长度
    char_col         CHAR(10),        -- 需指定固定长度
    binary_col       BINARY,
    boolean_col      BOOLEAN,

    -- 复杂数据类型
    array_col        ARRAY<STRING>,
    map_col          MAP<STRING, INT>,
    struct_col       STRUCT<name:STRING, age:INT>,
    union_col        UNIONTYPE<INT, STRING, BOOLEAN>  -- 联合类型(Hive 0.7.0+)
)
COMMENT '包含所有Hive数据类型的内部表'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'          -- 字段分隔符
COLLECTION ITEMS TERMINATED BY '\002' -- 集合元素分隔符
MAP KEYS TERMINATED BY '\003'         -- Map键值分隔符
LINES TERMINATED BY '\n'              -- 行分隔符
STORED AS TEXTFILE;                   -- 存储格式(默认格式)";
        ParseHiveQL(hiveSql);
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到