Merge v0
This commit is contained in:
parent
2aedd2625f
commit
101f371ed9
@ -23,6 +23,8 @@ namespace AsPartitionProcessing.SampleClient
|
||||
modelsConfig = InitializeFromDatabase();
|
||||
}
|
||||
|
||||
//PartitionProcessor.MergeMonthsToYear(modelsConfig[0], LogMessage, "Internet Sales", "2012");
|
||||
|
||||
foreach (ModelConfiguration modelConfig in modelsConfig)
|
||||
{
|
||||
if (!modelConfig.IntegratedAuth) //For Azure AS
|
||||
|
@ -123,7 +123,7 @@ namespace AsPartitionProcessing
|
||||
{
|
||||
command.Connection = connection;
|
||||
command.CommandType = CommandType.Text;
|
||||
command.CommandText = "DELETE FROM [dbo].[PartitionedModelLog];";
|
||||
command.CommandText = "DELETE FROM [dbo].[ProcessingLog];";
|
||||
command.ExecuteNonQuery();
|
||||
}
|
||||
}
|
||||
@ -144,7 +144,7 @@ namespace AsPartitionProcessing
|
||||
command.Connection = connection;
|
||||
command.CommandType = CommandType.Text;
|
||||
command.CommandText = @"
|
||||
INSERT INTO [dbo].[PartitionedModelLog]
|
||||
INSERT INTO [dbo].[ProcessingLog]
|
||||
([ModelConfigurationID]
|
||||
,[ExecutionID]
|
||||
,[LogDateTime]
|
||||
|
@ -27,9 +27,15 @@ namespace AsPartitionProcessing
|
||||
/// </summary>
|
||||
public static class PartitionProcessor
|
||||
{
|
||||
#region Member Variables
|
||||
|
||||
private static ModelConfiguration _modelConfiguration;
|
||||
private static LogMessageDelegate _messageLogger;
|
||||
|
||||
#endregion
|
||||
|
||||
#region Public Methods
|
||||
|
||||
/// <summary>
|
||||
/// Partitions tables in a tabular model based on configuration
|
||||
/// </summary>
|
||||
@ -80,11 +86,11 @@ namespace AsPartitionProcessing
|
||||
}
|
||||
else
|
||||
{
|
||||
//Non-partitioned table. Process based on partitioning configuration(s).
|
||||
//Partitioned table. Process based on partitioning configuration(s).
|
||||
Partition templatePartition = table.Partitions.Find(tableConfiguration.AnalysisServicesTable);
|
||||
if (templatePartition == null)
|
||||
{
|
||||
throw new Microsoft.AnalysisServices.ConnectionException($"Table {tableConfiguration.AnalysisServicesTable} does not contain a partition with the same name to act as the template partition.");
|
||||
throw new InvalidOperationException($"Table {tableConfiguration.AnalysisServicesTable} does not contain a partition with the same name to act as the template partition.");
|
||||
}
|
||||
|
||||
foreach (PartitioningConfiguration partitioningConfiguration in tableConfiguration.PartitioningConfigurations)
|
||||
@ -113,20 +119,7 @@ namespace AsPartitionProcessing
|
||||
}
|
||||
|
||||
//Process partitions
|
||||
string selectQueryTemplate;
|
||||
switch (partitioningConfiguration.Granularity)
|
||||
{
|
||||
case Granularity.Daily:
|
||||
selectQueryTemplate = "SELECT * FROM {0} WHERE {1} = {2} ORDER BY {1}";
|
||||
break;
|
||||
case Granularity.Monthly:
|
||||
selectQueryTemplate = "SELECT * FROM {0} WHERE FLOOR({1} / 100) = {2} ORDER BY {1}";
|
||||
break;
|
||||
default: //Granularity.Yearly:
|
||||
selectQueryTemplate = "SELECT * FROM {0} WHERE FLOOR({1} / 10000) = {2} ORDER BY {1}";
|
||||
break;
|
||||
}
|
||||
|
||||
string selectQueryTemplate = DeriveSelectQueryTemplate(partitioningConfiguration.Granularity);
|
||||
foreach (string partitionKey in partitionKeysForProcessing)
|
||||
{
|
||||
Partition partitionToProcess = table.Partitions.Find(partitionKey);
|
||||
@ -216,7 +209,10 @@ namespace AsPartitionProcessing
|
||||
LogMessage("", false);
|
||||
LogMessage($"Exception occurred: {DateTime.Now.ToString("hh:mm:ss tt")}", false);
|
||||
LogMessage($"Exception message: {exc.Message}", false);
|
||||
LogMessage($"Inner exception message: {exc.InnerException.Message}", false);
|
||||
if (exc.InnerException != null)
|
||||
{
|
||||
LogMessage($"Inner exception message: {exc.InnerException.Message}", false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
@ -230,6 +226,143 @@ namespace AsPartitionProcessing
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Merge month partitions into a year partition.
|
||||
/// </summary>
|
||||
/// <param name="modelConfiguration">Configuration info for the model</param>
|
||||
/// <param name="messageLogger">Pointer to logging method</param>
|
||||
/// <param name="analysisServicesTable">Name of the partitioned table in the tabular model.</param>
|
||||
/// <param name="yearPartitionKey">Year partition key following yyyy</param>
|
||||
public static void MergeMonthsToYear(ModelConfiguration modelConfiguration, LogMessageDelegate messageLogger, string analysisServicesTable, string yearPartitionKey)
|
||||
{
|
||||
_modelConfiguration = modelConfiguration;
|
||||
_messageLogger = messageLogger;
|
||||
|
||||
Server server = new Server();
|
||||
try
|
||||
{
|
||||
//Check new partition key is expected format
|
||||
int partitionKeyParsed;
|
||||
if (yearPartitionKey.Length != 4 || !int.TryParse(yearPartitionKey, out partitionKeyParsed))
|
||||
{
|
||||
throw new InvalidOperationException($"Partition key {yearPartitionKey} is not of expected format.");
|
||||
}
|
||||
|
||||
//Check the model configuration contains the partitioned table
|
||||
bool foundMatch = false;
|
||||
PartitioningConfiguration partitionConfig = null;
|
||||
foreach (TableConfiguration tableConfig in modelConfiguration.TableConfigurations)
|
||||
{
|
||||
if (tableConfig.AnalysisServicesTable == analysisServicesTable && tableConfig.PartitioningConfigurations.Count > 0)
|
||||
{
|
||||
partitionConfig = tableConfig.PartitioningConfigurations[0];
|
||||
foundMatch = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!foundMatch)
|
||||
{
|
||||
throw new InvalidOperationException($"Table {analysisServicesTable} not found in model configuration with at least one partitioning configuration (required for source table and column names).");
|
||||
}
|
||||
|
||||
Database database;
|
||||
Connect(server, out database);
|
||||
|
||||
Table table = database.Model.Tables.Find(analysisServicesTable);
|
||||
if (table == null)
|
||||
{
|
||||
throw new Microsoft.AnalysisServices.ConnectionException($"Could not connect to table {analysisServicesTable}.");
|
||||
}
|
||||
|
||||
//Find template partition
|
||||
Partition templatePartition = table.Partitions.Find(analysisServicesTable);
|
||||
if (templatePartition == null)
|
||||
{
|
||||
throw new InvalidOperationException($"Table {analysisServicesTable} does not contain a partition with the same name to act as the template partition.");
|
||||
}
|
||||
|
||||
//See if there is already a partition with same key - do not want to delete an existing partition in case of inadvertent data loss
|
||||
if (table.Partitions.Find(yearPartitionKey) != null)
|
||||
{
|
||||
throw new InvalidOperationException($"Table {analysisServicesTable} already contains a partition with key {yearPartitionKey}. Please delete this partition and retry.");
|
||||
}
|
||||
|
||||
LogMessage("", false);
|
||||
LogMessage($"Create new merged partition {yearPartitionKey} for table {analysisServicesTable}", false);
|
||||
|
||||
//Create new partition
|
||||
string selectQueryTemplate = DeriveSelectQueryTemplate(Granularity.Yearly);
|
||||
Partition newPartition = new Partition();
|
||||
templatePartition.CopyTo(newPartition);
|
||||
newPartition.Name = yearPartitionKey;
|
||||
((QueryPartitionSource)newPartition.Source).Query = String.Format(selectQueryTemplate, partitionConfig.SourceTableName, partitionConfig.SourcePartitionColumn, yearPartitionKey);
|
||||
table.Partitions.Add(newPartition);
|
||||
LogMessage($"Create new partition {DateFormatPartitionKey(yearPartitionKey, Granularity.Yearly)}", true);
|
||||
|
||||
|
||||
//Merge each month partition and delete it
|
||||
List<string> monthKeysToBeMerged = GetPartitionKeysTable(table, Granularity.Monthly, yearPartitionKey);
|
||||
if (monthKeysToBeMerged.Count == 0)
|
||||
{
|
||||
LogMessage($"No partitinos found in {analysisServicesTable} to be merged into {yearPartitionKey} ...", true);
|
||||
}
|
||||
else
|
||||
{
|
||||
List<Partition> partitionsToBeMerged = new List<Partition>();
|
||||
foreach (string monthKey in monthKeysToBeMerged)
|
||||
{
|
||||
LogMessage($"Partition {DateFormatPartitionKey(monthKey, Granularity.Monthly)} to be merged into {DateFormatPartitionKey(yearPartitionKey, Granularity.Yearly)}", true);
|
||||
partitionsToBeMerged.Add(table.Partitions.Find(monthKey));
|
||||
}
|
||||
|
||||
newPartition.RequestMerge(partitionsToBeMerged);
|
||||
LogMessage($"Save changes for table {analysisServicesTable} ...", true);
|
||||
database.Model.SaveChanges();
|
||||
}
|
||||
|
||||
Console.ForegroundColor = ConsoleColor.White;
|
||||
LogMessage("", false);
|
||||
LogMessage("Finish: " + DateTime.Now.ToString("hh:mm:ss tt"), false);
|
||||
}
|
||||
catch (Exception exc)
|
||||
{
|
||||
Console.ForegroundColor = ConsoleColor.Red;
|
||||
LogMessage("", false);
|
||||
LogMessage($"Exception occurred: {DateTime.Now.ToString("hh:mm:ss tt")}", false);
|
||||
LogMessage($"Exception message: {exc.Message}", false);
|
||||
if (exc.InnerException != null)
|
||||
{
|
||||
LogMessage($"Inner exception message: {exc.InnerException.Message}", false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
try
|
||||
{
|
||||
_modelConfiguration = null;
|
||||
_messageLogger = null;
|
||||
if (server != null) server.Disconnect();
|
||||
}
|
||||
catch { }
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Merge day partitions into a month partition.
|
||||
/// </summary>
|
||||
/// <param name="modelConfiguration">Configuration info for the model</param>
|
||||
/// <param name="messageLogger">Pointer to logging method</param>
|
||||
/// <param name="monthPartitionKey">Month partition key following yyyymm</param>
|
||||
public static void MergeDaysToMonth(ModelConfiguration modelConfiguration, LogMessageDelegate messageLogger, string monthPartitionKey)
|
||||
{
|
||||
|
||||
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Private Methods
|
||||
|
||||
private static void IncrementalProcessPartition(string partitionKey, Partition partitionToProcess, Granularity granularity)
|
||||
{
|
||||
if (_modelConfiguration.IncrementalOnline)
|
||||
@ -340,7 +473,7 @@ namespace AsPartitionProcessing
|
||||
return partitionKeys;
|
||||
}
|
||||
|
||||
private static List<string> GetPartitionKeysTable(Table table, Granularity granularity)
|
||||
private static List<string> GetPartitionKeysTable(Table table, Granularity granularity, string filter = "")
|
||||
{
|
||||
List<string> partitionKeysExisting = new List<string>();
|
||||
|
||||
@ -352,7 +485,8 @@ namespace AsPartitionProcessing
|
||||
( (partition.Name.Length == 4 && granularity == Granularity.Yearly) ||
|
||||
(partition.Name.Length == 6 && granularity == Granularity.Monthly) ||
|
||||
(partition.Name.Length == 8 && granularity == Granularity.Daily)
|
||||
) && int.TryParse(partition.Name, out partitionKey)
|
||||
) && int.TryParse(partition.Name, out partitionKey) &&
|
||||
partition.Name.StartsWith(filter)
|
||||
)
|
||||
{
|
||||
partitionKeysExisting.Add(Convert.ToString(partitionKey));
|
||||
@ -362,6 +496,27 @@ namespace AsPartitionProcessing
|
||||
|
||||
return partitionKeysExisting;
|
||||
}
|
||||
|
||||
private static string DeriveSelectQueryTemplate(Granularity granularity)
|
||||
{
|
||||
string selectQueryTemplate;
|
||||
switch (granularity)
|
||||
{
|
||||
case Granularity.Daily:
|
||||
selectQueryTemplate = "SELECT * FROM {0} WHERE {1} = {2} ORDER BY {1}";
|
||||
break;
|
||||
case Granularity.Monthly:
|
||||
selectQueryTemplate = "SELECT * FROM {0} WHERE FLOOR({1} / 100) = {2} ORDER BY {1}";
|
||||
break;
|
||||
default: //Granularity.Yearly:
|
||||
selectQueryTemplate = "SELECT * FROM {0} WHERE FLOOR({1} / 10000) = {2} ORDER BY {1}";
|
||||
break;
|
||||
}
|
||||
|
||||
return selectQueryTemplate;
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user