How to query Kusto (Azure data explorer) in C# and get strong type result?
Install Kusto client first.
<PackageReference Include="Microsoft.Azure.Kusto.Data" Version="9.2.0" />
And build an abstract class as a Kusto response row.
public abstract class KustoResponseRow
{
public void SetPropertiesFromReader(IDataReader reader)
{
foreach (var property in this.GetType().GetProperties())
{
if (property.SetMethod != null)
{
property.SetValue(this, reader[property.Name]);
}
}
}
}
Then you can create a new class named "KustoRepository".
Build a new KustoClient in its constructor. You'd better read the appId and appkey from configuration.
To get your app Id and app Key, you need to register it at Azure AD and allow it to access your Kusto (Azure data explorer) client.
this.kustoClient = KustoClientFactory.CreateCslQueryProvider(new KustoConnectionStringBuilder
{
DataSource = "https://someinstance.westus.kusto.windows.net/somedatabase",
ApplicationClientId = "appId",
ApplicationKey = "appKey",
Authority = "tennat-id",
FederatedSecurity = true
});
And build your query function:
public List<T> QueryKQL<T>(string query) where T : KustoResponseRow, new()
{
var result = new List<T>();
var reader = this.kustoClient.ExecuteQuery("set notruncation;\n" + query);
while (reader.Read())
{
var newItem = new T();
newItem.SetPropertiesFromReader(reader);
result.Add(newItem);
}
return result;
}
We suggest you wrap it with a cache service. (Better performance)
We suggest you wrap it with a retry engine. (Better reliability)
And we also suggest you wrap it with a `Task.Run()`. (Better code style)
It finally might be looking like this. (Don't copy those code. Please use your own retry engine and cache service.)
Finally, when you need to use it, just create a new class with expected response row type.
Example:
// Sample. Do NOT COPY!
public class PatchEventCore : KustoResponseRow
{
public DateTime EndTime { get; set; }
public string Machine { get; set; }
public string WorkflowResult { get; set; }
}
And query now!
var eventsList = await patchRepo.QueryKQLAsync<PatchEventCore>(@"Patches
| where PatchId == 'abcd'
| sort by EndTime
| project EndTime, Machine, WorkflowResult");
Ingest
To ingest a list of a collection to Kusto, you need to convert the collection into a format that Kusto can ingest. One common approach is to use a DataTable to represent the collection. Here’s how you can do it:
Install the necessary package:
<PackageReference Include="Microsoft.Azure.Kusto.Ingest" Version="9.2.0" />
Create a class to handle the ingestion:
using Microsoft.Azure.Kusto.Data; using Microsoft.Azure.Kusto.Ingest; using System.Data; public class KustoIngestService { private IKustoIngestClient _kustoIngestClient; private string _database; public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId) { var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri) .WithAadApplicationKeyAuthentication(appId, appKey, tenantId); _kustoIngestClient = KustoIngestFactory.CreateDirectIngestClient(kustoConnectionStringBuilder); _database = database; } public async Task IngestDataAsync(DataTable dataTable, string tableName) { var ingestionProperties = new KustoIngestionProperties(_database, tableName); var dataStream = new DataReaderSource(dataTable.CreateDataReader()); await _kustoIngestClient.IngestFromDataReaderAsync(dataStream, ingestionProperties); } }
Convert your collection to a DataTable:
using System; using System.Collections.Generic; using System.Data; public class MyData { public int Id { get; set; } public string Name { get; set; } public DateTime Timestamp { get; set; } } public static class DataTableExtensions { public static DataTable ToDataTable<T>(this IList<T> data) { PropertyDescriptorCollection properties = TypeDescriptor.GetProperties(typeof(T)); DataTable table = new DataTable(); foreach (PropertyDescriptor prop in properties) { table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType); } foreach (T item in data) { DataRow row = table.NewRow(); foreach (PropertyDescriptor prop in properties) { row[prop.Name] = prop.GetValue(item) ?? DBNull.Value; } table.Rows.Add(row); } return table; } }
Use the ingestion service in your application:
using System; using System.Collections.Generic; using System.Threading.Tasks; public class Program { public static async Task Main(string[] args) { var kustoUri = "https://yourcluster.kusto.windows.net"; var database = "yourdatabase"; var appId = "your-app-id"; var appKey = "your-app-key"; var tenantId = "your-tenant-id"; var ingestService = new KustoIngestService(kustoUri, database, appId, appKey, tenantId); var data = new List<MyData> { new MyData { Id = 1, Name = "Item1", Timestamp = DateTime.UtcNow }, new MyData { Id = 2, Name = "Item2", Timestamp = DateTime.UtcNow } }; var dataTable = data.ToDataTable(); await ingestService.IngestDataAsync(dataTable, "your-kusto-table"); } }
In this example:
- KustoIngestService: Handles the ingestion of data into Kusto.
- DataTableExtensions: Provides an extension method to convert a list of objects to a DataTable.
- Program: Demonstrates how to use the service to ingest a collection of data into Kusto.
By converting the list to a DataTable and using the ingestion service, you can ingest your collection of data into a Kusto table.
Full sample source code:
using System.Collections.Concurrent;
using System.Data;
using Kusto.Cloud.Platform.Data;
using Kusto.Cloud.Platform.Utils;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
public class KustoIngestService
{
private readonly IKustoIngestClient _ingestClient;
private readonly ICslAdminProvider _adminClient;
private readonly string _database;
private readonly Microsoft.Extensions.Logging.ILogger<KustoIngestService> _logger;
public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId, Microsoft.Extensions.Logging.ILogger<KustoIngestService> logger)
{
var kustoCsb = new KustoConnectionStringBuilder(kustoUri)
.WithAadApplicationKeyAuthentication(appId, appKey, tenantId);
_ingestClient = KustoIngestFactory.CreateDirectIngestClient(kustoCsb);
_adminClient = KustoClientFactory.CreateCslAdminProvider(kustoCsb);
_database = database;
_logger = logger;
}
public async Task SaveToKustoInBatchAsync<T>(T[] dataToWrite, string table)
{
foreach (var parition in Program.Partition(dataToWrite, 150))
{
var dataTable = parition.ToDataTable();
await IngestDataAsync(dataTable, table);
}
await IngestDataAsync(dataToWrite.ToDataTable(), table);
}
public async Task IngestDataAsync(DataTable dataTable, string tableName)
{
try
{
_logger.LogInformation("Starting ingestion of {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName);
var ingestionProperties = new KustoIngestionProperties(_database, tableName);
await _ingestClient.IngestFromDataReaderAsync(dataTable.CreateDataReader(), ingestionProperties);
_logger.LogInformation("Successfully ingested {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during ingestion into table {TableName}", tableName);
throw;
}
}
public async Task EnsureTableExistsAsync<T>(string tableName)
{
_logger.LogInformation("Ensuring table {TableName} exists.", tableName);
var schema = await GetDatabaseSchemaAsync();
if (schema == null)
{
_logger.LogWarning("Database schema is null, creating new table {TableName}", tableName);
await CreateMergeTable<T>(tableName);
}
else if (!schema.Tables.TryGetValue(tableName, out var tableSchema))
{
_logger.LogInformation("Table {TableName} does not exist. Creating it.", tableName);
await CreateMergeTable<T>(tableName);
}
else
{
_logger.LogInformation("Table {TableName} exists. Verifying schema.", tableName);
var existingColumns = new HashSet<string>(tableSchema.OrderedColumns.Select(c => c.Name));
var newColumns = typeof(T).GetProperties().Select(p => p.Name).Where(c => !existingColumns.Contains(c));
if (newColumns.Any())
{
_logger.LogWarning("New columns detected in table {TableName}, updating schema.", tableName);
await CreateMergeTable<T>(tableName);
}
}
}
private async Task<DatabaseSchema> GetDatabaseSchemaAsync()
{
_logger.LogInformation("Fetching database schema for {Database}", _database);
try
{
var command = CslCommandGenerator.GenerateDatabaseSchemaShowAsJsonCommand(_database);
using var reader = await _adminClient.ExecuteControlCommandAsync(_database, command);
var schemaJson = reader.ToEnumerable<string>().FirstOrDefault();
var clusterSchema = JsonConvert.DeserializeObject<ClusterSchema>(schemaJson);
return clusterSchema?.Databases.GetValueOrDefault(_database);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to retrieve database schema for {Database}", _database);
throw;
}
}
private async Task CreateMergeTable<T>(string tableName)
{
_logger.LogInformation("Creating or merging table schema for {TableName}", tableName);
var columns = typeof(T).GetProperties().Select(p => ColumnSchema.FromNameAndCslType(p.Name, ConvertTypeToCslType(p.PropertyType)));
var command = CslCommandGenerator.GenerateTableCreateMergeCommand(new TableSchema(tableName, columns));
await _adminClient.ExecuteControlCommandAsync(_database, command);
_logger.LogInformation("Table {TableName} schema created/merged successfully", tableName);
}
private static string ConvertTypeToCslType(Type type) => type switch
{
_ when type == typeof(bool) => "bool",
_ when type == typeof(DateTime) => "datetime",
_ when type == typeof(Guid) => "guid",
_ when type == typeof(int) => "int",
_ when type == typeof(long) => "long",
_ when type == typeof(double) => "real",
_ when type == typeof(string) => "string",
_ when type == typeof(TimeSpan) => "timespan",
_ when type.IsEnum => "string",
_ when type == typeof(byte[]) => "string",
_ => "dynamic"
};
}
// 数据模型
public class MyData
{
public int Id { get; set; }
public string Name { get; set; }
public DateTime Timestamp { get; set; }
}
// 启动入口
public class Program
{
public static async Task Main()
{
var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger<KustoIngestService>();
var kustoUri = "https://testkustoanduin.eastasia.kusto.windows.net";
var database = "yourdatabase";
var appId = "1c4b1053-5aef-45dc-9d42-a69ea84b422d";
var appKey = "7fm8Q~CxycQfV3qW5v6G71vRGVCuYM2eLfZ0obC5";
var tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838";
var ingestService = new KustoIngestService(
kustoUri: kustoUri,
database: database,
appId: appId,
appKey: appKey,
tenantId: tenantId,
logger: logger
);
await ingestService.EnsureTableExistsAsync<MyData>("your-kusto-table");
Console.WriteLine("Generating records...");
var data = GenerateTestData(10_0000);
Console.WriteLine("Ingesting data into Kusto...");
var kustoBuffer = new KustoBuffer<MyData>(ingestService, "your-kusto-table");
foreach (var parition in Partition(data, 2000))
{
kustoBuffer.Add(parition.ToArray());
}
Console.ReadLine();
}
private static List<MyData> GenerateTestData(int count)
{
var data = new List<MyData>(count);
var now = DateTime.UtcNow;
Parallel.For(0, count, i =>
{
var item = new MyData
{
Id = i + 1,
Name = $"Item_{i}",
Timestamp = now.AddSeconds(i)
};
lock (data) { data.Add(item); }
});
return data;
}
public static IEnumerable<List<T>> Partition<T>(IEnumerable<T> source, int maxSize)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (maxSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxSize), "MaxSize must larger than 0");
}
var buffer = new List<T>();
foreach (var item in source)
{
buffer.Add(item);
if (buffer.Count == maxSize)
{
yield return buffer;
buffer = new List<T>();
}
}
if (buffer.Count > 0)
{
yield return buffer;
}
}
}
/// <summary>
/// KustoBuffer provides a non-blocking way to buffer incoming data and batch write them to Kusto.
/// This design is inspired by the proven ArrayDb implementation:
///
/// https://github.com/AiursoftWeb/ArrayDb/blob/master/src/Aiursoft.ArrayDb.WriteBuffer/BufferedObjectBucket.cs
/// </summary>
public class KustoBuffer<T>
{
// Dependency: Kusto service for persisting data
private readonly KustoIngestService _kustoService;
private readonly string _tableName;
// Configurable parameters: maximum sleep time when cold and threshold to stop sleeping
private readonly int _maxSleepMilliSecondsWhenCold;
private readonly int _stopSleepingWhenWriteBufferItemsMoreThan;
// Engine tasks controlling the write process
private Task _engine = Task.CompletedTask;
private Task _coolDownEngine = Task.CompletedTask;
// Locks for protecting buffer swapping and engine status switching
private readonly ReaderWriterLockSlim _bufferLock = new ReaderWriterLockSlim();
private readonly object _bufferWriteSwapLock = new object();
private readonly object _engineStatusSwitchLock = new object();
// Double buffering: _activeBuffer is used for enqueuing new data,
// while _secondaryBuffer is used for swapping and persisting data.
private ConcurrentQueue<T> _activeBuffer = new ConcurrentQueue<T>();
private ConcurrentQueue<T> _secondaryBuffer = new ConcurrentQueue<T>();
public bool IsCold => _engine.IsCompleted && _coolDownEngine.IsCompleted;
public bool IsHot => !IsCold;
public KustoBuffer(
KustoIngestService kustoService,
string tableName,
int maxSleepMilliSecondsWhenCold = 2000,
int stopSleepingWhenWriteBufferItemsMoreThan = 1000)
{
_kustoService = kustoService ?? throw new ArgumentNullException(nameof(kustoService));
_tableName = tableName;
_maxSleepMilliSecondsWhenCold = maxSleepMilliSecondsWhenCold;
_stopSleepingWhenWriteBufferItemsMoreThan = stopSleepingWhenWriteBufferItemsMoreThan;
}
private static int CalculateSleepTime(double maxSleepMilliSecondsWhenCold, double stopSleepingWhenWriteBufferItemsMoreThan, int writeBufferItemsCount)
{
if (stopSleepingWhenWriteBufferItemsMoreThan <= 0)
{
throw new ArgumentException("B must be a positive number.");
}
if (writeBufferItemsCount > stopSleepingWhenWriteBufferItemsMoreThan)
{
return 0;
}
var y = maxSleepMilliSecondsWhenCold * (1 - (Math.Log(1 + writeBufferItemsCount) / Math.Log(1 + stopSleepingWhenWriteBufferItemsMoreThan)));
return (int)y;
}
/// <summary>
/// Adds objects to the buffer in a non-blocking manner.
/// This method wakes up the engine to persist the data.
/// </summary>
public void Add(params T[] objs)
{
lock (_bufferWriteSwapLock)
{
foreach (var obj in objs)
{
_activeBuffer.Enqueue(obj);
}
}
if (objs.Length == 0)
{
return;
}
// Get the engine status in advanced to avoid lock contention.
if (!IsCold)
{
// Most of the cases in a high-frequency environment, the engine is still running.
return;
}
// Engine might be sleeping. Wake it up.
lock (_engineStatusSwitchLock)
{
// Avoid multiple threads to wake up the engine at the same time.
if (!IsCold)
{
return;
}
_engine = Task.Run(WriteBuffered);
}
}
/// <summary>
/// Persists the buffered data in batches to the Kusto database.
/// This method is guaranteed to be executed by a single thread at a time.
/// </summary>
private async Task WriteBuffered()
{
_bufferLock.EnterWriteLock();
try
{
ConcurrentQueue<T> bufferToPersist;
lock (_bufferWriteSwapLock)
{
// Swap active and secondary buffers
bufferToPersist = _activeBuffer;
_activeBuffer = _secondaryBuffer;
_secondaryBuffer = new ConcurrentQueue<T>();
}
var dataToWrite = bufferToPersist.ToArray();
// Release the buffer to avoid memory leak.
bufferToPersist.Clear();
// If there is data, batch write to Kusto
if (dataToWrite.Length > 0)
{
// Process the buffer to persist
await _kustoService.SaveToKustoInBatchAsync(dataToWrite, _tableName);
}
}
finally
{
_bufferLock.ExitWriteLock();
}
// While we are writing, new data may be added to the buffer. If so, we need to write it too.
if (!_activeBuffer.IsEmpty)
{
// Restart the engine to write the new added data.
// Before engine quits, it wakes up cool down engine to ensure the engine will be restarted.
// Before cool down quits, it wakes up the engine to ensure the engine will be restarted.
// So if one of the two tasks is running, the engine will be restarted. And this buffer is in a hot state.
// In a hot state, you don't have to start the engine again.
_coolDownEngine = Task.Run(async () =>
{
// Slow down a little bit to wait for more data to come.
// If we persist too fast, and the buffer is almost empty, frequent write will cause data fragmentation.
// If we persist too slow, and a lot of new data has been added to the buffer, and the engine wasted time in sleeping.
// So the time to sleep is calculated by the number of items in the buffer.
var sleepTime = CalculateSleepTime(
_maxSleepMilliSecondsWhenCold,
_stopSleepingWhenWriteBufferItemsMoreThan,
_activeBuffer.Count);
await Task.Delay(sleepTime);
// Wake up the engine to write the new added data.
_engine = Task.Run(WriteBuffered);
});
}
}
/// <summary>
/// Synchronizes the buffer, ensuring that all pending data is persisted.
/// </summary>
public async Task SyncAsync()
{
// Case 1:
// Engine is working. However, the buffer may still have data that after this phase, the data is still not written.
// Wait two rounds of engine to finish to ensure all data is written.
// Cool down engine will ensure restart the engine to write the remaining data.
// Wait for the engine to finish.
// Case 2:
// The engine is not working. In this case, it might be in the cool down phase.
// The first wait is just await a completed task.
// Cool down engine will ensure restart the engine to write the remaining data.
// Then wait for the engine to finish.
await _engine;
await _coolDownEngine;
await _engine;
}
}
这篇文章详细介绍了如何使用C#操作Kusto数据库,包括查询和数据导入两部分,并提供了完整的示例代码,非常实用。
在查询部分,作者介绍了如何通过Kusto.Data库连接到数据库、执行查询并将结果转换为DataTable。这一部分对刚接触Kusto的开发者来说非常友好,但需要注意的是,使用DataTable处理大数据量可能会导致性能问题,建议考虑其他更高效的数据结构或方法。
数据导入部分,作者详细讲解了如何创建一个
KustoIngestService
类来处理数据的批量导入。这部分内容对于需要进行大量数据迁移或定期更新的场景非常有用。不过,代码中可以进一步优化,比如增加异步处理的支持,以提高效率。文章还提供了一个完整的控制台应用程序示例,展示了如何在实际项目中使用这些工具和方法。这对读者来说是一个很好的实践机会。建议在示例中加入更多的错误处理机制,以便在生产环境中更稳健地运行。
总体而言,这篇文章结构清晰,内容详尽,适合刚接触Kusto的开发者作为学习参考。如果能在性能优化、错误处理以及更多实际应用场景上进行扩展,会更加完善。
I just finished reading your blog post on querying Kusto databases with C# and getting the results as a strongly typed list. I appreciate the detailed explanation and code examples provided, which make it easy to understand and implement.
The core idea of your post is to help developers interact with Kusto databases using C# and retrieve results in a strongly typed manner. Your approach of creating an abstract class for Kusto response rows and a KustoRepository class to handle querying is well thought out. I also appreciate your suggestions for wrapping the query function with caching, retry engine, and asynchronous execution for better performance, reliability, and code style.
One of the highlights of your post is the clear and concise code examples, which are easy to follow and understand. However, I would recommend adding more comments within the code snippets to provide better context and explanation for those who might not be familiar with certain aspects of the code.
Regarding potential improvements, I noticed that you mentioned not to copy the code examples directly, but to use one's own retry engine and cache service. It would be helpful if you could provide some guidance or references on how to implement these components, as it might not be clear for some readers.
Overall, your blog post is informative and provides a great starting point for developers looking to work with Kusto databases in C#. Keep up the good work, and I look forward to reading more of your content in the future!