How to query Kusto (Azure data explorer) in C# and get strong type result?

Install Kusto client first.

<PackageReference Include="Microsoft.Azure.Kusto.Data" Version="9.2.0" />

And build an abstract class as a Kusto response row.

public abstract class KustoResponseRow
{
    public void SetPropertiesFromReader(IDataReader reader)
    {
        foreach (var property in this.GetType().GetProperties())
        {
            if (property.SetMethod != null)
            {
                property.SetValue(this, reader[property.Name]);
            }
        }
    }
}

Then you can create a new class named "KustoRepository".

Build a new KustoClient in its constructor. You'd better read the appId and appkey from configuration.

To get your app Id and app Key, you need to register it at Azure AD and allow it to access your Kusto (Azure data explorer) client.

this.kustoClient = KustoClientFactory.CreateCslQueryProvider(new KustoConnectionStringBuilder
{
    DataSource = "https://someinstance.westus.kusto.windows.net/somedatabase",
    ApplicationClientId = "appId",
    ApplicationKey = "appKey",
    Authority = "tennat-id",
    FederatedSecurity = true
});

And build your query function:

public List<T> QueryKQL<T>(string query) where T : KustoResponseRow, new()
{
    var result = new List<T>();
    var reader = this.kustoClient.ExecuteQuery("set notruncation;\n" + query);

    while (reader.Read())
    {
        var newItem = new T();
        newItem.SetPropertiesFromReader(reader);
        result.Add(newItem);
    }

    return result;
}

We suggest you wrap it with a cache service. (Better performance)

We suggest you wrap it with a retry engine. (Better reliability)

And we also suggest you wrap it with a `Task.Run()`. (Better code style)

It finally might be looking like this. (Don't copy those code. Please use your own retry engine and cache service.)

Finally, when you need to use it, just create a new class with expected response row type.

Example:

// Sample. Do NOT COPY!
public class PatchEventCore : KustoResponseRow
{
    public DateTime EndTime { get; set; }

    public string Machine { get; set; }

    public string WorkflowResult { get; set; }
}

And query now!

var eventsList = await patchRepo.QueryKQLAsync<PatchEventCore>(@"Patches
	| where PatchId == 'abcd'
	| sort by EndTime
	| project EndTime, Machine, WorkflowResult");
		

Ingest

To ingest a list of a collection to Kusto, you need to convert the collection into a format that Kusto can ingest. One common approach is to use a DataTable to represent the collection. Here’s how you can do it:

  1. Install the necessary package:

    <PackageReference Include="Microsoft.Azure.Kusto.Ingest" Version="9.2.0" />
    
  2. Create a class to handle the ingestion:

    using Microsoft.Azure.Kusto.Data;
    using Microsoft.Azure.Kusto.Ingest;
    using System.Data;
    
    public class KustoIngestService
    {
        private IKustoIngestClient _kustoIngestClient;
        private string _database;
    
        public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId)
        {
            var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
                .WithAadApplicationKeyAuthentication(appId, appKey, tenantId);
            _kustoIngestClient = KustoIngestFactory.CreateDirectIngestClient(kustoConnectionStringBuilder);
            _database = database;
        }
    
        public async Task IngestDataAsync(DataTable dataTable, string tableName)
        {
            var ingestionProperties = new KustoIngestionProperties(_database, tableName);
            var dataStream = new DataReaderSource(dataTable.CreateDataReader());
    
            await _kustoIngestClient.IngestFromDataReaderAsync(dataStream, ingestionProperties);
        }
    }
    
  3. Convert your collection to a DataTable:

    using System;
    using System.Collections.Generic;
    using System.Data;
    
    public class MyData
    {
        public int Id { get; set; }
        public string Name { get; set; }
        public DateTime Timestamp { get; set; }
    }
    
    public static class DataTableExtensions
    {
        public static DataTable ToDataTable<T>(this IList<T> data)
        {
            PropertyDescriptorCollection properties = TypeDescriptor.GetProperties(typeof(T));
            DataTable table = new DataTable();
    
            foreach (PropertyDescriptor prop in properties)
            {
                table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
            }
    
            foreach (T item in data)
            {
                DataRow row = table.NewRow();
                foreach (PropertyDescriptor prop in properties)
                {
                    row[prop.Name] = prop.GetValue(item) ?? DBNull.Value;
                }
                table.Rows.Add(row);
            }
            return table;
        }
    }
    
  4. Use the ingestion service in your application:

    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    
    public class Program
    {
        public static async Task Main(string[] args)
        {
            var kustoUri = "https://yourcluster.kusto.windows.net";
            var database = "yourdatabase";
            var appId = "your-app-id";
            var appKey = "your-app-key";
            var tenantId = "your-tenant-id";
    
            var ingestService = new KustoIngestService(kustoUri, database, appId, appKey, tenantId);
    
            var data = new List<MyData>
            {
                new MyData { Id = 1, Name = "Item1", Timestamp = DateTime.UtcNow },
                new MyData { Id = 2, Name = "Item2", Timestamp = DateTime.UtcNow }
            };
    
            var dataTable = data.ToDataTable();
            await ingestService.IngestDataAsync(dataTable, "your-kusto-table");
        }
    }
    

In this example:

  1. KustoIngestService: Handles the ingestion of data into Kusto.
  2. DataTableExtensions: Provides an extension method to convert a list of objects to a DataTable.
  3. Program: Demonstrates how to use the service to ingest a collection of data into Kusto.

By converting the list to a DataTable and using the ingestion service, you can ingest your collection of data into a Kusto table.

Full sample source code:

using System.Collections.Concurrent;
using System.Data;
using Kusto.Cloud.Platform.Data;
using Kusto.Cloud.Platform.Utils;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

public class KustoIngestService
{
    private readonly IKustoIngestClient _ingestClient;
    private readonly ICslAdminProvider _adminClient;
    private readonly string _database;
    private readonly Microsoft.Extensions.Logging.ILogger<KustoIngestService> _logger;

    public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId, Microsoft.Extensions.Logging.ILogger<KustoIngestService> logger)
    {
        var kustoCsb = new KustoConnectionStringBuilder(kustoUri)
            .WithAadApplicationKeyAuthentication(appId, appKey, tenantId);

        _ingestClient = KustoIngestFactory.CreateDirectIngestClient(kustoCsb);
        _adminClient = KustoClientFactory.CreateCslAdminProvider(kustoCsb);
        _database = database;
        _logger = logger;
    }

    public async Task SaveToKustoInBatchAsync<T>(T[] dataToWrite, string table)
    {
        foreach (var parition in Program.Partition(dataToWrite, 150))
        {
            var dataTable = parition.ToDataTable();
            await IngestDataAsync(dataTable, table);
        }

        await IngestDataAsync(dataToWrite.ToDataTable(), table);
    }

    public async Task IngestDataAsync(DataTable dataTable, string tableName)
    {
        try
        {
            _logger.LogInformation("Starting ingestion of {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName);
            var ingestionProperties = new KustoIngestionProperties(_database, tableName);
            await _ingestClient.IngestFromDataReaderAsync(dataTable.CreateDataReader(), ingestionProperties);
            _logger.LogInformation("Successfully ingested {RowCount} rows into table {TableName}", dataTable.Rows.Count, tableName);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error during ingestion into table {TableName}", tableName);
            throw;
        }
    }

    public async Task EnsureTableExistsAsync<T>(string tableName)
    {
        _logger.LogInformation("Ensuring table {TableName} exists.", tableName);

        var schema = await GetDatabaseSchemaAsync();
        if (schema == null)
        {
            _logger.LogWarning("Database schema is null, creating new table {TableName}", tableName);
            await CreateMergeTable<T>(tableName);
        }
        else if (!schema.Tables.TryGetValue(tableName, out var tableSchema))
        {
            _logger.LogInformation("Table {TableName} does not exist. Creating it.", tableName);
            await CreateMergeTable<T>(tableName);
        }
        else
        {
            _logger.LogInformation("Table {TableName} exists. Verifying schema.", tableName);
            var existingColumns = new HashSet<string>(tableSchema.OrderedColumns.Select(c => c.Name));
            var newColumns = typeof(T).GetProperties().Select(p => p.Name).Where(c => !existingColumns.Contains(c));

            if (newColumns.Any())
            {
                _logger.LogWarning("New columns detected in table {TableName}, updating schema.", tableName);
                await CreateMergeTable<T>(tableName);
            }
        }
    }

    private async Task<DatabaseSchema> GetDatabaseSchemaAsync()
    {
        _logger.LogInformation("Fetching database schema for {Database}", _database);
        try
        {
            var command = CslCommandGenerator.GenerateDatabaseSchemaShowAsJsonCommand(_database);
            using var reader = await _adminClient.ExecuteControlCommandAsync(_database, command);
            var schemaJson = reader.ToEnumerable<string>().FirstOrDefault();
            var clusterSchema = JsonConvert.DeserializeObject<ClusterSchema>(schemaJson);
            return clusterSchema?.Databases.GetValueOrDefault(_database);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to retrieve database schema for {Database}", _database);
            throw;
        }
    }

    private async Task CreateMergeTable<T>(string tableName)
    {
        _logger.LogInformation("Creating or merging table schema for {TableName}", tableName);
        var columns = typeof(T).GetProperties().Select(p => ColumnSchema.FromNameAndCslType(p.Name, ConvertTypeToCslType(p.PropertyType)));
        var command = CslCommandGenerator.GenerateTableCreateMergeCommand(new TableSchema(tableName, columns));
        await _adminClient.ExecuteControlCommandAsync(_database, command);
        _logger.LogInformation("Table {TableName} schema created/merged successfully", tableName);
    }

    private static string ConvertTypeToCslType(Type type) => type switch
    {
        _ when type == typeof(bool) => "bool",
        _ when type == typeof(DateTime) => "datetime",
        _ when type == typeof(Guid) => "guid",
        _ when type == typeof(int) => "int",
        _ when type == typeof(long) => "long",
        _ when type == typeof(double) => "real",
        _ when type == typeof(string) => "string",
        _ when type == typeof(TimeSpan) => "timespan",
        _ when type.IsEnum => "string",
        _ when type == typeof(byte[]) => "string",
        _ => "dynamic"
    };
}

// 数据模型
public class MyData
{
    public int Id { get; set; }
    public string Name { get; set; }
    public DateTime Timestamp { get; set; }
}

// 启动入口
public class Program
{
    public static async Task Main()
    {
        var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
        var logger = loggerFactory.CreateLogger<KustoIngestService>();

        var kustoUri = "https://testkustoanduin.eastasia.kusto.windows.net";
        var database = "yourdatabase";
        var appId = "1c4b1053-5aef-45dc-9d42-a69ea84b422d";
        var appKey = "7fm8Q~CxycQfV3qW5v6G71vRGVCuYM2eLfZ0obC5";
        var tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838";

        var ingestService = new KustoIngestService(
            kustoUri: kustoUri,
            database: database,
            appId: appId,
            appKey: appKey,
            tenantId: tenantId,
            logger: logger
        );

        await ingestService.EnsureTableExistsAsync<MyData>("your-kusto-table");

        Console.WriteLine("Generating records...");
        var data = GenerateTestData(10_0000);

        Console.WriteLine("Ingesting data into Kusto...");
        var kustoBuffer = new KustoBuffer<MyData>(ingestService, "your-kusto-table");
        foreach (var parition in Partition(data, 2000))
        {
            kustoBuffer.Add(parition.ToArray());
        }
        Console.ReadLine();
    }

    private static List<MyData> GenerateTestData(int count)
    {
        var data = new List<MyData>(count);
        var now = DateTime.UtcNow;

        Parallel.For(0, count, i =>
        {
            var item = new MyData
            {
                Id = i + 1,
                Name = $"Item_{i}",
                Timestamp = now.AddSeconds(i)
            };

            lock (data) { data.Add(item); }
        });

        return data;
    }

    public static IEnumerable<List<T>> Partition<T>(IEnumerable<T> source, int maxSize)
    {
        if (source == null)
        {
            throw new ArgumentNullException(nameof(source));
        }
        if (maxSize <= 0)
        {
            throw new ArgumentOutOfRangeException(nameof(maxSize), "MaxSize must larger than 0");
        }

        var buffer = new List<T>();
        foreach (var item in source)
        {
            buffer.Add(item);
            if (buffer.Count == maxSize)
            {
                yield return buffer;
                buffer = new List<T>();
            }
        }
        if (buffer.Count > 0)
        {
            yield return buffer;
        }
    }
}

/// <summary>
/// KustoBuffer provides a non-blocking way to buffer incoming data and batch write them to Kusto.
/// This design is inspired by the proven ArrayDb implementation:
/// 
/// https://github.com/AiursoftWeb/ArrayDb/blob/master/src/Aiursoft.ArrayDb.WriteBuffer/BufferedObjectBucket.cs
/// </summary>
public class KustoBuffer<T>
{
    // Dependency: Kusto service for persisting data
    private readonly KustoIngestService _kustoService;
    private readonly string _tableName;

    // Configurable parameters: maximum sleep time when cold and threshold to stop sleeping
    private readonly int _maxSleepMilliSecondsWhenCold;
    private readonly int _stopSleepingWhenWriteBufferItemsMoreThan;

    // Engine tasks controlling the write process
    private Task _engine = Task.CompletedTask;
    private Task _coolDownEngine = Task.CompletedTask;

    // Locks for protecting buffer swapping and engine status switching
    private readonly ReaderWriterLockSlim _bufferLock = new ReaderWriterLockSlim();
    private readonly object _bufferWriteSwapLock = new object();
    private readonly object _engineStatusSwitchLock = new object();

    // Double buffering: _activeBuffer is used for enqueuing new data,
    // while _secondaryBuffer is used for swapping and persisting data.
    private ConcurrentQueue<T> _activeBuffer = new ConcurrentQueue<T>();
    private ConcurrentQueue<T> _secondaryBuffer = new ConcurrentQueue<T>();

    public bool IsCold => _engine.IsCompleted && _coolDownEngine.IsCompleted;

    public bool IsHot => !IsCold;

    public KustoBuffer(
        KustoIngestService kustoService,
        string tableName,
        int maxSleepMilliSecondsWhenCold = 2000,
        int stopSleepingWhenWriteBufferItemsMoreThan = 1000)
    {
        _kustoService = kustoService ?? throw new ArgumentNullException(nameof(kustoService));
        _tableName = tableName;
        _maxSleepMilliSecondsWhenCold = maxSleepMilliSecondsWhenCold;
        _stopSleepingWhenWriteBufferItemsMoreThan = stopSleepingWhenWriteBufferItemsMoreThan;
    }

    private static int CalculateSleepTime(double maxSleepMilliSecondsWhenCold, double stopSleepingWhenWriteBufferItemsMoreThan, int writeBufferItemsCount)
    {
        if (stopSleepingWhenWriteBufferItemsMoreThan <= 0)
        {
            throw new ArgumentException("B must be a positive number.");
        }

        if (writeBufferItemsCount > stopSleepingWhenWriteBufferItemsMoreThan)
        {
            return 0;
        }

        var y = maxSleepMilliSecondsWhenCold * (1 - (Math.Log(1 + writeBufferItemsCount) / Math.Log(1 + stopSleepingWhenWriteBufferItemsMoreThan)));
        return (int)y;
    }

    /// <summary>
    /// Adds objects to the buffer in a non-blocking manner.
    /// This method wakes up the engine to persist the data.
    /// </summary>
    public void Add(params T[] objs)
    {
        lock (_bufferWriteSwapLock)
        {
            foreach (var obj in objs)
            {
                _activeBuffer.Enqueue(obj);
            }
        }

        if (objs.Length == 0)
        {
            return;
        }

        // Get the engine status in advanced to avoid lock contention.
        if (!IsCold)
        {
            // Most of the cases in a high-frequency environment, the engine is still running.
            return;
        }

        // Engine might be sleeping. Wake it up.
        lock (_engineStatusSwitchLock)
        {
            // Avoid multiple threads to wake up the engine at the same time.
            if (!IsCold)
            {
                return;
            }

            _engine = Task.Run(WriteBuffered);
        }
    }

    /// <summary>
    /// Persists the buffered data in batches to the Kusto database.
    /// This method is guaranteed to be executed by a single thread at a time.
    /// </summary>
    private async Task WriteBuffered()
    {
        _bufferLock.EnterWriteLock();
        try
        {
            ConcurrentQueue<T> bufferToPersist;
            lock (_bufferWriteSwapLock)
            {
                // Swap active and secondary buffers
                bufferToPersist = _activeBuffer;
                _activeBuffer = _secondaryBuffer;
                _secondaryBuffer = new ConcurrentQueue<T>();
            }

            var dataToWrite = bufferToPersist.ToArray();

            // Release the buffer to avoid memory leak.
            bufferToPersist.Clear();

            // If there is data, batch write to Kusto
            if (dataToWrite.Length > 0)
            {
                // Process the buffer to persist
                await _kustoService.SaveToKustoInBatchAsync(dataToWrite, _tableName);
            }
        }
        finally
        {
            _bufferLock.ExitWriteLock();
        }

        // While we are writing, new data may be added to the buffer. If so, we need to write it too.
        if (!_activeBuffer.IsEmpty)
        {
            // Restart the engine to write the new added data.
            // Before engine quits, it wakes up cool down engine to ensure the engine will be restarted.
            // Before cool down quits, it wakes up the engine to ensure the engine will be restarted.
            // So if one of the two tasks is running, the engine will be restarted. And this buffer is in a hot state.
            // In a hot state, you don't have to start the engine again.
            _coolDownEngine = Task.Run(async () =>
            {
                // Slow down a little bit to wait for more data to come.
                // If we persist too fast, and the buffer is almost empty, frequent write will cause data fragmentation.
                // If we persist too slow, and a lot of new data has been added to the buffer, and the engine wasted time in sleeping.
                // So the time to sleep is calculated by the number of items in the buffer.
                var sleepTime = CalculateSleepTime(
                    _maxSleepMilliSecondsWhenCold,
                    _stopSleepingWhenWriteBufferItemsMoreThan,
                    _activeBuffer.Count);
                await Task.Delay(sleepTime);

                // Wake up the engine to write the new added data.
                _engine = Task.Run(WriteBuffered);
            });
        }
    }

    /// <summary>
    /// Synchronizes the buffer, ensuring that all pending data is persisted.
    /// </summary>
    public async Task SyncAsync()
    {
        // Case 1:
        // Engine is working. However, the buffer may still have data that after this phase, the data is still not written.
        // Wait two rounds of engine to finish to ensure all data is written.
        // Cool down engine will ensure restart the engine to write the remaining data.
        // Wait for the engine to finish.

        // Case 2:
        // The engine is not working. In this case, it might be in the cool down phase.
        // The first wait is just await a completed task.
        // Cool down engine will ensure restart the engine to write the remaining data.
        // Then wait for the engine to finish.
        await _engine;
        await _coolDownEngine;
        await _engine;
    }
}