merge partitions
This commit is contained in:
parent
101f371ed9
commit
ff012a5c4f
@ -5,26 +5,32 @@ using Microsoft.AnalysisServices.Tabular;
|
||||
|
||||
namespace AsPartitionProcessing.SampleClient
|
||||
{
|
||||
enum SampleExecutionMode
|
||||
{
|
||||
InitializeInline,
|
||||
InitializeFromDatabase,
|
||||
MergePartitions
|
||||
}
|
||||
|
||||
class Program
|
||||
{
|
||||
const bool UseDatabase = false;
|
||||
//Set sample execution mode here:
|
||||
const SampleExecutionMode execMode = SampleExecutionMode.InitializeInline;
|
||||
|
||||
static void Main(string[] args)
|
||||
{
|
||||
try
|
||||
{
|
||||
List<ModelConfiguration> modelsConfig;
|
||||
if (!UseDatabase)
|
||||
if (execMode == SampleExecutionMode.InitializeInline)
|
||||
{
|
||||
modelsConfig = InitializeAdventureWorksInline();
|
||||
modelsConfig = InitializeInline();
|
||||
}
|
||||
else
|
||||
{
|
||||
modelsConfig = InitializeFromDatabase();
|
||||
}
|
||||
|
||||
//PartitionProcessor.MergeMonthsToYear(modelsConfig[0], LogMessage, "Internet Sales", "2012");
|
||||
|
||||
foreach (ModelConfiguration modelConfig in modelsConfig)
|
||||
{
|
||||
if (!modelConfig.IntegratedAuth) //For Azure AS
|
||||
@ -36,10 +42,16 @@ namespace AsPartitionProcessing.SampleClient
|
||||
modelConfig.Password = ReadPassword();
|
||||
}
|
||||
|
||||
//Most important method:
|
||||
if (execMode == SampleExecutionMode.MergePartitions)
|
||||
{
|
||||
PartitionProcessor.MergePartitions(modelConfig, LogMessage, "Internet Sales", Granularity.Yearly, "2012");
|
||||
}
|
||||
else
|
||||
{
|
||||
PartitionProcessor.PerformProcessing(modelConfig, LogMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception exc)
|
||||
{
|
||||
Console.ForegroundColor = ConsoleColor.Red;
|
||||
@ -51,7 +63,7 @@ namespace AsPartitionProcessing.SampleClient
|
||||
Console.ReadKey();
|
||||
}
|
||||
|
||||
private static List<ModelConfiguration> InitializeAdventureWorksInline()
|
||||
private static List<ModelConfiguration> InitializeInline()
|
||||
{
|
||||
ModelConfiguration partitionedModel = new ModelConfiguration(
|
||||
modelConfigurationID: 1,
|
||||
@ -140,10 +152,13 @@ namespace AsPartitionProcessing.SampleClient
|
||||
private static void LogMessage(string message, ModelConfiguration partitionedModel)
|
||||
{
|
||||
//Can provide custom logger here
|
||||
|
||||
try
|
||||
{
|
||||
if (UseDatabase)
|
||||
if (!(execMode == SampleExecutionMode.InitializeInline))
|
||||
{
|
||||
ConfigDatabaseHelper.LogMessage(message, partitionedModel);
|
||||
}
|
||||
|
||||
Console.WriteLine(message);
|
||||
}
|
||||
|
@ -100,9 +100,9 @@ namespace AsPartitionProcessing
|
||||
LogMessage(new String('-', tableConfiguration.AnalysisServicesTable.Length + 38), false);
|
||||
|
||||
//Figure out what processing needs to be done
|
||||
List<string> partitionKeysCurrent = GetPartitionKeysTable(table, partitioningConfiguration.Granularity);
|
||||
List<string> partitionKeysNew = GetPartitionKeys(false, partitioningConfiguration, partitioningConfiguration.Granularity);
|
||||
List<string> partitionKeysForProcessing = GetPartitionKeys(true, partitioningConfiguration, partitioningConfiguration.Granularity);
|
||||
List<string> partitionKeysCurrent = GetPartitionKeysCurrent(table, partitioningConfiguration.Granularity);
|
||||
List<string> partitionKeysNew = GetPartitionKeysTarget(false, partitioningConfiguration, partitioningConfiguration.Granularity);
|
||||
List<string> partitionKeysForProcessing = GetPartitionKeysTarget(true, partitioningConfiguration, partitioningConfiguration.Granularity);
|
||||
DisplayPartitionRange(partitionKeysCurrent, true, partitioningConfiguration.Granularity);
|
||||
DisplayPartitionRange(partitionKeysNew, false, partitioningConfiguration.Granularity);
|
||||
LogMessage("", false);
|
||||
@ -119,19 +119,13 @@ namespace AsPartitionProcessing
|
||||
}
|
||||
|
||||
//Process partitions
|
||||
string selectQueryTemplate = DeriveSelectQueryTemplate(partitioningConfiguration.Granularity);
|
||||
foreach (string partitionKey in partitionKeysForProcessing)
|
||||
{
|
||||
Partition partitionToProcess = table.Partitions.Find(partitionKey);
|
||||
|
||||
if (partitionToProcess == null)
|
||||
{
|
||||
//Doesn't exist so create it
|
||||
partitionToProcess = new Partition();
|
||||
templatePartition.CopyTo(partitionToProcess);
|
||||
partitionToProcess.Name = partitionKey;
|
||||
((QueryPartitionSource)partitionToProcess.Source).Query = String.Format(selectQueryTemplate, partitioningConfiguration.SourceTableName, partitioningConfiguration.SourcePartitionColumn, partitionKey);
|
||||
table.Partitions.Add(partitionToProcess);
|
||||
partitionToProcess = CreateNewPartition(table, templatePartition, partitioningConfiguration, partitionKey);
|
||||
LogMessage($"Create new partition {DateFormatPartitionKey(partitionKey, partitioningConfiguration.Granularity)}", true);
|
||||
|
||||
if (!_modelConfiguration.InitialSetUp)
|
||||
@ -226,14 +220,28 @@ namespace AsPartitionProcessing
|
||||
}
|
||||
}
|
||||
|
||||
private static Partition CreateNewPartition(Table table, Partition templatePartition, PartitioningConfiguration partitioningConfiguration, string partitionKey)
|
||||
{
|
||||
Partition partitionToProcess;
|
||||
//Doesn't exist so create it
|
||||
string selectQueryTemplate = DeriveSelectQueryTemplate(partitioningConfiguration.Granularity);
|
||||
partitionToProcess = new Partition();
|
||||
templatePartition.CopyTo(partitionToProcess);
|
||||
partitionToProcess.Name = partitionKey;
|
||||
((QueryPartitionSource)partitionToProcess.Source).Query = String.Format(selectQueryTemplate, partitioningConfiguration.SourceTableName, partitioningConfiguration.SourcePartitionColumn, partitionKey);
|
||||
table.Partitions.Add(partitionToProcess);
|
||||
return partitionToProcess;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Merge month partitions into a year partition.
|
||||
/// Merge months into a year, or days into a month.
|
||||
/// </summary>
|
||||
/// <param name="modelConfiguration">Configuration info for the model</param>
|
||||
/// <param name="messageLogger">Pointer to logging method</param>
|
||||
/// <param name="analysisServicesTable">Name of the partitioned table in the tabular model.</param>
|
||||
/// <param name="yearPartitionKey">Year partition key following yyyy</param>
|
||||
public static void MergeMonthsToYear(ModelConfiguration modelConfiguration, LogMessageDelegate messageLogger, string analysisServicesTable, string yearPartitionKey)
|
||||
/// <param name="targetGranularity">Granularity of the newly created partition. Must be year or month.</param>
|
||||
/// <param name="partitionKey">Target partition key. If year, follow yyyy; if month follow yyyymm.</param>
|
||||
public static void MergePartitions(ModelConfiguration modelConfiguration, LogMessageDelegate messageLogger, string analysisServicesTable, Granularity targetGranularity, string partitionKey)
|
||||
{
|
||||
_modelConfiguration = modelConfiguration;
|
||||
_messageLogger = messageLogger;
|
||||
@ -241,14 +249,24 @@ namespace AsPartitionProcessing
|
||||
Server server = new Server();
|
||||
try
|
||||
{
|
||||
//Check new partition key is expected format
|
||||
int partitionKeyParsed;
|
||||
if (yearPartitionKey.Length != 4 || !int.TryParse(yearPartitionKey, out partitionKeyParsed))
|
||||
//Check target granularity
|
||||
if (targetGranularity == Granularity.Daily)
|
||||
{
|
||||
throw new InvalidOperationException($"Partition key {yearPartitionKey} is not of expected format.");
|
||||
throw new InvalidOperationException($"Target granularity for merging must be year or month.");
|
||||
}
|
||||
|
||||
//Check the model configuration contains the partitioned table
|
||||
//Check new partition key is expected format
|
||||
int partitionKeyParsed;
|
||||
if ( !(
|
||||
(partitionKey.Length == 4 && targetGranularity == Granularity.Yearly) ||
|
||||
(partitionKey.Length == 6 && targetGranularity == Granularity.Monthly)
|
||||
) || !int.TryParse(partitionKey, out partitionKeyParsed)
|
||||
)
|
||||
{
|
||||
throw new InvalidOperationException($"Partition key {partitionKey} is not of expected format.");
|
||||
}
|
||||
|
||||
//Check configuration contains the partitioned table
|
||||
bool foundMatch = false;
|
||||
PartitioningConfiguration partitionConfig = null;
|
||||
foreach (TableConfiguration tableConfig in modelConfiguration.TableConfigurations)
|
||||
@ -262,7 +280,7 @@ namespace AsPartitionProcessing
|
||||
}
|
||||
if (!foundMatch)
|
||||
{
|
||||
throw new InvalidOperationException($"Table {analysisServicesTable} not found in model configuration with at least one partitioning configuration (required for source table and column names).");
|
||||
throw new InvalidOperationException($"Table {analysisServicesTable} not found in configuration with at least one partitioning configuration defined.");
|
||||
}
|
||||
|
||||
Database database;
|
||||
@ -282,48 +300,40 @@ namespace AsPartitionProcessing
|
||||
}
|
||||
|
||||
//See if there is already a partition with same key - do not want to delete an existing partition in case of inadvertent data loss
|
||||
if (table.Partitions.Find(yearPartitionKey) != null)
|
||||
if (table.Partitions.Find(partitionKey) != null)
|
||||
{
|
||||
throw new InvalidOperationException($"Table {analysisServicesTable} already contains a partition with key {yearPartitionKey}. Please delete this partition and retry.");
|
||||
throw new InvalidOperationException($"Table {analysisServicesTable} already contains a partition with key {partitionKey}. Please delete this partition and retry.");
|
||||
}
|
||||
|
||||
LogMessage("", false);
|
||||
LogMessage($"Create new merged partition {yearPartitionKey} for table {analysisServicesTable}", false);
|
||||
|
||||
//Create new partition
|
||||
string selectQueryTemplate = DeriveSelectQueryTemplate(Granularity.Yearly);
|
||||
Partition newPartition = new Partition();
|
||||
templatePartition.CopyTo(newPartition);
|
||||
newPartition.Name = yearPartitionKey;
|
||||
((QueryPartitionSource)newPartition.Source).Query = String.Format(selectQueryTemplate, partitionConfig.SourceTableName, partitionConfig.SourcePartitionColumn, yearPartitionKey);
|
||||
table.Partitions.Add(newPartition);
|
||||
LogMessage($"Create new partition {DateFormatPartitionKey(yearPartitionKey, Granularity.Yearly)}", true);
|
||||
|
||||
|
||||
//Merge each month partition and delete it
|
||||
List<string> monthKeysToBeMerged = GetPartitionKeysTable(table, Granularity.Monthly, yearPartitionKey);
|
||||
if (monthKeysToBeMerged.Count == 0)
|
||||
//Check there are partitions to be merged
|
||||
Granularity childGranularity = targetGranularity == Granularity.Yearly ? Granularity.Monthly : Granularity.Daily;
|
||||
List<Partition> partitionsToBeMerged = GetPartitionsCurrent(table, childGranularity, partitionKey);
|
||||
if (partitionsToBeMerged.Count == 0)
|
||||
{
|
||||
LogMessage($"No partitinos found in {analysisServicesTable} to be merged into {yearPartitionKey} ...", true);
|
||||
LogMessage($"No partitinos found in {analysisServicesTable} to be merged into {partitionKey}.", false);
|
||||
}
|
||||
else
|
||||
{
|
||||
List<Partition> partitionsToBeMerged = new List<Partition>();
|
||||
foreach (string monthKey in monthKeysToBeMerged)
|
||||
//Done with validation, so go ahead ...
|
||||
LogMessage("", false);
|
||||
LogMessage($"Create new merged partition {DateFormatPartitionKey(partitionKey, targetGranularity)} for table {analysisServicesTable}", true);
|
||||
Partition newPartition = CreateNewPartition(table, templatePartition, partitionConfig, partitionKey);
|
||||
|
||||
foreach (Partition partition in partitionsToBeMerged)
|
||||
{
|
||||
LogMessage($"Partition {DateFormatPartitionKey(monthKey, Granularity.Monthly)} to be merged into {DateFormatPartitionKey(yearPartitionKey, Granularity.Yearly)}", true);
|
||||
partitionsToBeMerged.Add(table.Partitions.Find(monthKey));
|
||||
LogMessage($"Partition {partition.Name} to be merged into {DateFormatPartitionKey(partitionKey, targetGranularity)}", true);
|
||||
}
|
||||
|
||||
newPartition.RequestMerge(partitionsToBeMerged);
|
||||
LogMessage($"Save changes for table {analysisServicesTable} ...", true);
|
||||
database.Model.SaveChanges();
|
||||
}
|
||||
|
||||
Console.ForegroundColor = ConsoleColor.White;
|
||||
LogMessage("", false);
|
||||
LogMessage("Finish: " + DateTime.Now.ToString("hh:mm:ss tt"), false);
|
||||
}
|
||||
}
|
||||
catch (Exception exc)
|
||||
{
|
||||
Console.ForegroundColor = ConsoleColor.Red;
|
||||
@ -442,7 +452,7 @@ namespace AsPartitionProcessing
|
||||
}
|
||||
}
|
||||
|
||||
private static List<string> GetPartitionKeys(bool forProcessing, PartitioningConfiguration partitioningConfiguration, Granularity granularity)
|
||||
private static List<string> GetPartitionKeysTarget(bool forProcessing, PartitioningConfiguration partitioningConfiguration, Granularity granularity)
|
||||
{
|
||||
//forProcessing: false to return complete target set of partitions, true to return partitons to be processed (may be less if incremental mode).
|
||||
|
||||
@ -473,21 +483,14 @@ namespace AsPartitionProcessing
|
||||
return partitionKeys;
|
||||
}
|
||||
|
||||
private static List<string> GetPartitionKeysTable(Table table, Granularity granularity, string filter = "")
|
||||
private static List<string> GetPartitionKeysCurrent(Table table, Granularity granularity, string filter = "")
|
||||
{
|
||||
List<string> partitionKeysExisting = new List<string>();
|
||||
|
||||
foreach (Partition partition in table.Partitions)
|
||||
{
|
||||
//Ignore partitions that don't follow the convention yyyy, yyyymm or yyyymmdd
|
||||
int partitionKey;
|
||||
if (
|
||||
( (partition.Name.Length == 4 && granularity == Granularity.Yearly) ||
|
||||
(partition.Name.Length == 6 && granularity == Granularity.Monthly) ||
|
||||
(partition.Name.Length == 8 && granularity == Granularity.Daily)
|
||||
) && int.TryParse(partition.Name, out partitionKey) &&
|
||||
partition.Name.StartsWith(filter)
|
||||
)
|
||||
int partitionKey = -1;
|
||||
if (IncludePartition(granularity, filter, partition, ref partitionKey))
|
||||
{
|
||||
partitionKeysExisting.Add(Convert.ToString(partitionKey));
|
||||
}
|
||||
@ -497,6 +500,33 @@ namespace AsPartitionProcessing
|
||||
return partitionKeysExisting;
|
||||
}
|
||||
|
||||
private static List<Partition> GetPartitionsCurrent(Table table, Granularity granularity, string filter = "")
|
||||
{
|
||||
List<Partition> partitionsExisting = new List<Partition>();
|
||||
|
||||
foreach (Partition partition in table.Partitions)
|
||||
{
|
||||
int partitionKey = -1;
|
||||
if (IncludePartition(granularity, filter, partition, ref partitionKey))
|
||||
{
|
||||
partitionsExisting.Add(partition);
|
||||
}
|
||||
}
|
||||
partitionsExisting.Sort();
|
||||
|
||||
return partitionsExisting;
|
||||
}
|
||||
|
||||
private static bool IncludePartition(Granularity granularity, string filter, Partition partition, ref int partitionKey)
|
||||
{
|
||||
//Ignore partitions that don't follow the convention yyyy, yyyymm or yyyymmdd, or are not included in the filter expression
|
||||
return ( (partition.Name.Length == 4 && granularity == Granularity.Yearly) ||
|
||||
(partition.Name.Length == 6 && granularity == Granularity.Monthly) ||
|
||||
(partition.Name.Length == 8 && granularity == Granularity.Daily)
|
||||
) && int.TryParse(partition.Name, out partitionKey) &&
|
||||
partition.Name.StartsWith(filter);
|
||||
}
|
||||
|
||||
private static string DeriveSelectQueryTemplate(Granularity granularity)
|
||||
{
|
||||
string selectQueryTemplate;
|
||||
|
Loading…
Reference in New Issue
Block a user