Build Your First Database Plugin


This is a sample historical database plugin for Stream SCADA that stores time-series data in PostgreSQL.


1. We'll use Visual Studio 2022 (or 2019) for this purpose.



2. The following NuGet package is required.


Name: Npgsql 

Version: 4.0.17




3. C# code:


using System;

using System.Collections.Generic;

using System.ComponentModel;

using System.Data;

using System.Globalization;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using Npgsql;

using Stream.Common.Shared;


namespace PostgreSqlHistoricalPlugin

{

    /// <summary>

    /// PostgreSQL Historical Database Plugin

    /// Stores historical tag data in a PostgreSQL time-series database

    /// </summary>

    public class PostgreSqlHistoricalPlugin : HistoricalDatabasePluginBase

    {

        private string _connectionString;

        private string _tableName = "datadb";

        private bool _isReady;


        public override string DisplayName => "PostgreSQL Historical Plugin";

        public override string Version => "1.0.0";


        // Plugin Settings (configurable in UI)

        [DefaultValue("localhost")]

        [Description("PostgreSQL server hostname or IP address")]

        public string Host { get; set; } = "localhost";


        [DefaultValue(5432)]

        [Description("PostgreSQL server port number")]

        public int Port { get; set; } = 5432;


        [DefaultValue("streamscada")]

        [Description("Name of the PostgreSQL database")]

        public string DatabaseName { get; set; } = "streamscada";


        [DefaultValue("postgres")]

        [Description("PostgreSQL username")]

        public string Username { get; set; } = "postgres";


        [DefaultValue("")]

        [Description("PostgreSQL password")]

        public string Password { get; set; } = "";


        [DefaultValue(true)]

        [Description("Use SSL connection to PostgreSQL server")]

        public bool UseSSL { get; set; } = true;


        [DefaultValue(30)]

        [Description("Command timeout in seconds")]

        public int CommandTimeout { get; set; } = 30;


        public override async Task Initialize()

        {

            try

            {

                // Build connection string

                var builder = new NpgsqlConnectionStringBuilder

                {

                    Host = Host,

                    Port = Port,

                    Database = DatabaseName,

                    Username = Username,

                    Password = Password,

                    Timeout = CommandTimeout,

                    CommandTimeout = CommandTimeout

                };


                // SSL Mode

                if (UseSSL)

                {

                    builder.SslMode = SslMode.Prefer;

                }

                else

                {

                    builder.SslMode = SslMode.Disable;

                }


                _connectionString = builder.ToString();


                // Test connection

                using (var conn = new NpgsqlConnection(_connectionString))

                {

                    await conn.OpenAsync();

                }


                _isReady = true;

                await Task.CompletedTask;

            }

            catch (Exception ex)

            {

                _isReady = false;

                throw new Exception($"PostgreSQL Plugin Initialize failed: {ex.Message}", ex);

            }

        }


        public override async Task InsertTVSDataAsync_transPeriodicCompact(List<LogStruct> lstLogStruct)

        {

            if (lstLogStruct == null || lstLogStruct.Count == 0)

                return;


            try

            {

                // Lazy initialization on first write

                if (!_isReady)

                {

                    await Initialize();

                }


                if (!_isReady)

                    return;


                // Ensure table exists with correct schema

                await CreateTableIfNotExist_Compact(lstLogStruct);


                using (var conn = new NpgsqlConnection(_connectionString))

                {

                    await conn.OpenAsync();


                    using (var transaction = conn.BeginTransaction())

                    {

                        foreach (var logStruct in lstLogStruct)

                        {

                            if (logStruct.lstTVS == null || logStruct.lstTVS.Count == 0)

                                continue;


                            using (var cmd = conn.CreateCommand())

                            {

                                // Build INSERT statement dynamically based on tags

                                var columnNames = string.Join(", ", logStruct.lstTVS.Select(tvs => $"\"{tvs.TagName}\""));

                                var paramNames = string.Join(", ", logStruct.lstTVS.Select((tvs, idx) => $"@param{idx}"));


                                cmd.CommandText = $"INSERT INTO {_tableName} (timestamp, {columnNames}) VALUES (@TimeStamp, {paramNames})";


                                cmd.Parameters.AddWithValue("@TimeStamp", DateTime.Parse(logStruct.TimeStamp));


                                int paramIdx = 0;

                                foreach (var tvs in logStruct.lstTVS)

                                {

                                    var value = tvs.ScaledValue ?? tvs.RawValue;

                                    cmd.Parameters.AddWithValue($"@param{paramIdx}", value ?? (object)DBNull.Value);

                                    paramIdx++;

                                }


                                await cmd.ExecuteNonQueryAsync();

                            }

                        }


                        await transaction.CommitAsync();

                    }

                }

            }

            catch (Exception ex)

            {

                throw new Exception($"InsertTVSDataAsync_transPeriodicCompact failed: {ex.Message}", ex);

            }

        }


        public override async Task InsertTVSDataAsync_trans2_Detailed(List<LogStruct> lstLogStruct)

        {

            if (lstLogStruct == null || lstLogStruct.Count == 0)

                return;


            try

            {

                // Lazy initialization on first write

                if (!_isReady)

                {

                    await Initialize();

                }


                if (!_isReady)

                    return;


                // Ensure table exists

                await CreateTableIfNotExist_Detailed();


                using (var conn = new NpgsqlConnection(_connectionString))

                {

                    await conn.OpenAsync();


                    using (var transaction = conn.BeginTransaction())

                    {

                        foreach (var logStruct in lstLogStruct)

                        {

                            if (logStruct.lstTVS == null || logStruct.lstTVS.Count == 0)

                                continue;


                            foreach (var tvs in logStruct.lstTVS)

                            {

                                using (var cmd = conn.CreateCommand())

                                {

                                    cmd.CommandText = $@"INSERT INTO {_tableName} 

                                        (timestamp, tagname, rawvalue, scaledvalue, status, comstatus) 

                                        VALUES (@TimeStamp, @TagName, @RawValue, @ScaledValue, @Status, @ComStatus)";


                                    cmd.Parameters.AddWithValue("@TimeStamp", DateTime.Parse(logStruct.TimeStamp));

                                    cmd.Parameters.AddWithValue("@TagName", tvs.TagName);

                                    cmd.Parameters.AddWithValue("@RawValue", tvs.RawValue ?? (object)DBNull.Value);

                                    cmd.Parameters.AddWithValue("@ScaledValue", tvs.ScaledValue ?? (object)DBNull.Value);

                                    cmd.Parameters.AddWithValue("@Status", tvs.Status ?? "");

                                    cmd.Parameters.AddWithValue("@ComStatus", tvs.ComStatus ?? "");


                                    await cmd.ExecuteNonQueryAsync();

                                }

                            }

                        }


                        await transaction.CommitAsync();

                    }

                }

            }

            catch (Exception ex)

            {

                throw new Exception($"InsertTVSDataAsync_trans2_Detailed failed: {ex.Message}", ex);

            }

        }


        public override async Task<FnResponse<List<TrendData>>> GetHistoricalTagValuesAsync_Detailed(

            string tagName, string startDate, string endDate, string aggregation, string groupBy)

        {

            var response = new FnResponse<List<TrendData>>();


            try

            {

                // No initialization needed for reads - database must already exist

                var trendData = new List<TrendData>();


                using (var conn = new NpgsqlConnection(_connectionString))

                {

                    await conn.OpenAsync();


                    string dateGroupBy = GetPostgresGroupByString(groupBy);

                    string aggKey = GetAggregation(aggregation);

                    string query;


                    // Match the pattern from DataDBSqlite

                    if (aggKey != "consumption")

                    {

                        // For RT mode (aggKey = ""), the query becomes: SELECT tagname, timestamp, (scaledvalue) As value

                        // For aggregation modes: SELECT tagname, timestamp, avg(scaledvalue) As value

                        query = $@"SELECT tagname, timestamp, {aggKey}(scaledvalue::numeric) As value 

                            FROM {_tableName} 

                            WHERE LOWER(tagname) = LOWER(@TagName)

                            AND timestamp BETWEEN @StartDate AND @EndDate

                            GROUP BY date_trunc('{dateGroupBy}', timestamp), tagname

                            ORDER BY timestamp";

                    }

                    else

                    {

                        // Consumption = max - min

                        query = $@"SELECT tagname, timestamp, 

                            max(scaledvalue::numeric) - min(scaledvalue::numeric) As value 

                            FROM {_tableName} 

                            WHERE LOWER(tagname) = LOWER(@TagName)

                            AND timestamp BETWEEN @StartDate AND @EndDate

                            GROUP BY date_trunc('{dateGroupBy}', timestamp), tagname

                            ORDER BY timestamp";

                    }


                    using (var cmd = new NpgsqlCommand(query, conn))

                    {

                        cmd.Parameters.AddWithValue("@TagName", tagName);

                        cmd.Parameters.AddWithValue("@StartDate", DateTime.Parse(startDate));

                        cmd.Parameters.AddWithValue("@EndDate", DateTime.Parse(endDate));


                        using (var reader = await cmd.ExecuteReaderAsync())

                        {

                            while (await reader.ReadAsync())

                            {

                                DateTime time = reader.GetDateTime(reader.GetOrdinal("timestamp"));

                                double value = SafeToDouble(reader["value"]);


                                trendData.Add(new TrendData(time, value));

                            }

                        }

                    }

                }


                response.IsOK = true;

                response.Data = trendData;

            }

            catch (Exception ex)

            {

                response.IsOK = false;

                response.ErrorMsg = $"GetHistoricalTagValuesAsync_Detailed failed: {ex.Message}";

            }


            return response;

        }


        public override async Task<FnResponse<List<TrendDataBio>>> GetHistoricalTagValuesAsync_Compact(

            string startDate, string endDate, string aggregation, string groupBy)

        {

            var response = new FnResponse<List<TrendDataBio>>();


            try

            {

                // No initialization needed for reads

                var trendData = new List<TrendDataBio>();


                using (var conn = new NpgsqlConnection(_connectionString))

                {

                    await conn.OpenAsync();


                    string dateGroupBy = GetPostgresGroupByString(groupBy);


                    // Get all columns except timestamp

                    var columns = await GetTableColumns(conn);

                    var dataColumns = columns.Where(c => c.ToLower() != "timestamp").ToList();


                    if (dataColumns.Count == 0)

                    {

                        response.IsOK = true;

                        response.Data = trendData;

                        return response;

                    }


                    // Build query: SELECT * FROM table GROUP BY date_trunc(groupBy, timestamp)

                    var query = $@"SELECT * FROM {_tableName}

                        WHERE timestamp BETWEEN @StartDate AND @EndDate

                        GROUP BY date_trunc('{dateGroupBy}', timestamp), timestamp

                        ORDER BY timestamp";


                    using (var cmd = new NpgsqlCommand(query, conn))

                    {

                        cmd.Parameters.AddWithValue("@StartDate", DateTime.Parse(startDate));

                        cmd.Parameters.AddWithValue("@EndDate", DateTime.Parse(endDate));


                        using (var reader = await cmd.ExecuteReaderAsync())

                        {

                            while (await reader.ReadAsync())

                            {

                                DateTime time = reader.GetDateTime(reader.GetOrdinal("timestamp"));

                                var lstTV = new List<TagValueStatusSlim>();


                                foreach (var col in dataColumns)

                                {

                                    var tagValue = new TagValueStatusSlim

                                    {

                                        TagName = col,

                                        Value = SafeToString(reader[col])

                                    };

                                    lstTV.Add(tagValue);

                                }


                                // TrendDataBio constructor: Sub New(TagName As String, lstTV As List(Of TagValueStatusSlim))

                                // Note: First parameter is TagName (not used in compact mode, pass empty string)

                                var bioData = new TrendDataBio("", lstTV);

                                bioData.Time = time; // Set the time property after construction

                                trendData.Add(bioData);

                            }

                        }

                    }

                }


                response.IsOK = true;

                response.Data = trendData;

            }

            catch (Exception ex)

            {

                response.IsOK = false;

                response.ErrorMsg = $"GetHistoricalTagValuesAsync_Compact failed: {ex.Message}";

            }


            return response;

        }


        public override async Task<FnResponse<List<TrendDataBio>>> GetLastRecordAsync_Compact()

        {

            var response = new FnResponse<List<TrendDataBio>>();


            try

            {

                var trendData = new List<TrendDataBio>();


                using (var conn = new NpgsqlConnection(_connectionString))

                {

                    await conn.OpenAsync();


                    // Get all columns except timestamp

                    var columns = await GetTableColumns(conn);

                    var dataColumns = columns.Where(c => c.ToLower() != "timestamp").ToList();


                    if (dataColumns.Count == 0)

                    {

                        response.IsOK = true;

                        response.Data = trendData;

                        return response;

                    }


                    var query = $"SELECT * FROM {_tableName} ORDER BY timestamp DESC LIMIT 1";


                    using (var cmd = new NpgsqlCommand(query, conn))

                    {

                        using (var reader = await cmd.ExecuteReaderAsync())

                        {

                            if (await reader.ReadAsync())

                            {

                                DateTime time = reader.GetDateTime(reader.GetOrdinal("timestamp"));

                                var lstTV = new List<TagValueStatusSlim>();


                                foreach (var col in dataColumns)

                                {

                                    var tagValue = new TagValueStatusSlim

                                    {

                                        TagName = col,

                                        Value = SafeToString(reader[col])

                                    };

                                    lstTV.Add(tagValue);

                                }


                                var bioData = new TrendDataBio("", lstTV);

                                bioData.Time = time; // Set the time property after construction

                                trendData.Add(bioData);

                            }

                        }

                    }

                }


                response.IsOK = true;

                response.Data = trendData;

            }

            catch (Exception ex)

            {

                response.IsOK = false;

                response.ErrorMsg = $"GetLastRecordAsync_Compact failed: {ex.Message}";

            }


            return response;

        }


        public override async Task CheckToDeleteOldRecords(int numDays)

        {

            try

            {

                using (var conn = new NpgsqlConnection(_connectionString))

                {

                    await conn.OpenAsync();


                    var cutoffDate = DateTime.Now.AddDays(-numDays);

                    var query = $"DELETE FROM {_tableName} WHERE timestamp < @CutoffDate";


                    using (var cmd = new NpgsqlCommand(query, conn))

                    {

                        cmd.Parameters.AddWithValue("@CutoffDate", cutoffDate);

                        await cmd.ExecuteNonQueryAsync();

                    }

                }

            }

            catch (Exception ex)

            {

                throw new Exception($"CheckToDeleteOldRecords failed: {ex.Message}", ex);

            }

        }


        public override void Disconnect()

        {

            _isReady = false;

            // PostgreSQL connection pooling handles cleanup automatically

        }


           public override DataTable GetGroupedTagDataAsDt(

            string filePath, string tagName, string startDate, string endDate, string aggregation, string groupBy)

        {

            var dataTable = new DataTable();

            dataTable.Columns.Add("Time", typeof(string));

            dataTable.Columns.Add("Value", typeof(double));


            try

            {

                if (!_isReady)

                    return dataTable;


                using (var conn = new NpgsqlConnection(_connectionString))

                {

                    conn.Open();


                    string dateGroupBy = GetPostgresGroupByString(groupBy);

                    string aggKey = GetAggregation(aggregation);


                    string query = $@"

                        SELECT date_trunc('{dateGroupBy}', timestamp) as GroupedTime,

                               {aggKey}(scaledvalue::numeric) as AggValue

                        FROM {_tableName}

                        WHERE LOWER(tagname) = LOWER(@TagName)

                          AND timestamp BETWEEN @StartDate AND @EndDate

                        GROUP BY GroupedTime

                        ORDER BY GroupedTime";


                    using (var cmd = new NpgsqlCommand(query, conn))

                    {

                        cmd.Parameters.AddWithValue("@TagName", tagName);

                        cmd.Parameters.AddWithValue("@StartDate", DateTime.Parse(startDate));

                        cmd.Parameters.AddWithValue("@EndDate", DateTime.Parse(endDate));


                        using (var reader = cmd.ExecuteReader())

                        {

                            while (reader.Read())

                            {

                                // PostgreSQL returns timestamp as DateTime object

                                var groupedTime = reader["GroupedTime"];

                                string timeStr = groupedTime is DateTime dtValue 

                                    ? dtValue.ToString("yyyy-MM-dd HH:mm:ss"

                                    : groupedTime.ToString();

                                

                                double value = SafeToDouble(reader["AggValue"]);

                                

                                dataTable.Rows.Add(timeStr, value);

                            }

                        }

                    }

                }

            }

            catch (Exception ex)

            {

                // Log error but return empty DataTable

                Console.WriteLine($"GetGroupedTagDataAsDt error: {ex.Message}");

            }


            return dataTable;

        }


        public override FnResponse<string> ValidateConfiguration()

        {

            if (string.IsNullOrWhiteSpace(Host))

            {

                return new FnResponse<string>

                {

                    IsOK = false,

                    ErrorMsg = "Host cannot be empty",

                    Data = null

                };

            }


            if (Port <= 0 || Port > 65535)

            {

                return new FnResponse<string>

                {

                    IsOK = false,

                    ErrorMsg = "Port must be between 1 and 65535",

                    Data = null

                };

            }


            if (string.IsNullOrWhiteSpace(DatabaseName))

            {

                return new FnResponse<string>

                {

                    IsOK = false,

                    ErrorMsg = "DatabaseName cannot be empty",

                    Data = null

                };

            }


            if (string.IsNullOrWhiteSpace(Username))

            {

                return new FnResponse<string>

                {

                    IsOK = false,

                    ErrorMsg = "Username cannot be empty",

                    Data = null

                };

            }


            return new FnResponse<string>

            {

                IsOK = true,

                ErrorMsg = "Configuration is valid",

                Data = "OK"

            };

        }


        #region Helper Methods


        private async Task CreateTableIfNotExist_Detailed()

        {

            try

            {

                using (var conn = new NpgsqlConnection(_connectionString))

                {

                    await conn.OpenAsync();


                    var createTableSql = $@"

                        CREATE TABLE IF NOT EXISTS {_tableName} (

                            id SERIAL PRIMARY KEY,

                            timestamp TIMESTAMP NOT NULL,

                            tagname VARCHAR(100) NOT NULL,

                            rawvalue VARCHAR(50),

                            scaledvalue VARCHAR(50),

                            status VARCHAR(20),

                            comstatus VARCHAR(20)

                        );

                        CREATE INDEX IF NOT EXISTS idx_{_tableName}_timestamp ON {_tableName}(timestamp);

                        CREATE INDEX IF NOT EXISTS idx_{_tableName}_tagname ON {_tableName}(tagname);

                    ";


                    using (var cmd = new NpgsqlCommand(createTableSql, conn))

                    {

                        await cmd.ExecuteNonQueryAsync();

                    }

                }

            }

            catch (Exception ex)

            {

                throw new Exception($"CreateTableIfNotExist_Detailed failed: {ex.Message}", ex);

            }

        }


        private async Task CreateTableIfNotExist_Compact(List<LogStruct> lstLogStruct)

        {

            try

            {

                if (lstLogStruct == null || lstLogStruct.Count == 0)

                    return;


                var firstLogStruct = lstLogStruct.FirstOrDefault(ls => ls.lstTVS != null && ls.lstTVS.Count > 0);

                if (firstLogStruct == null)

                    return;


                using (var conn = new NpgsqlConnection(_connectionString))

                {

                    await conn.OpenAsync();


                    // Build column definitions

                    var columnDefs = new StringBuilder();

                    columnDefs.Append("id SERIAL PRIMARY KEY, timestamp TIMESTAMP NOT NULL");


                    foreach (var tvs in firstLogStruct.lstTVS)

                    {

                        columnDefs.Append($", \"{tvs.TagName}\" VARCHAR(50)");

                    }


                    var createTableSql = $@"

                        CREATE TABLE IF NOT EXISTS {_tableName} ({columnDefs});

                        CREATE INDEX IF NOT EXISTS idx_{_tableName}_timestamp ON {_tableName}(timestamp);

                    ";


                    using (var cmd = new NpgsqlCommand(createTableSql, conn))

                    {

                        await cmd.ExecuteNonQueryAsync();

                    }

                }

            }

            catch (Exception ex)

            {

                throw new Exception($"CreateTableIfNotExist_Compact failed: {ex.Message}", ex);

            }

        }


        private async Task<List<string>> GetTableColumns(NpgsqlConnection conn)

        {

            var columns = new List<string>();


            try

            {

                var query = $@"

                    SELECT column_name 

                    FROM information_schema.columns 

                    WHERE table_name = '{_tableName}'

                    ORDER BY ordinal_position";


                using (var cmd = new NpgsqlCommand(query, conn))

                {

                    using (var reader = await cmd.ExecuteReaderAsync())

                    {

                        while (await reader.ReadAsync())

                        {

                            columns.Add(reader.GetString(0));

                        }

                    }

                }

            }

            catch (Exception ex)

            {

                // Log error but return empty list to allow operation to continue

                Console.WriteLine($"GetTableColumns error: {ex.Message}");

            }


            return columns;

        }


        private string GetPostgresGroupByString(string groupBy)

        {

            // PostgreSQL date_trunc function parameters

            switch (groupBy?.ToLower())

            {

                case "ms":

                case "millisecond":

                    return "millisecond";

                case "second":

                    return "second";

                case "minute":

                    return "minute";

                case "hour":

                    return "hour";

                case "day":

                    return "day";

                case "month":

                    return "month";

                case "year":

                    return "year";

                default:

                    return "second";

            }

        }


        private string GetAggregation(string aggregation)

        {

            switch (aggregation?.ToLower())

            {

                case "sum": return "sum";

                case "avg": return "avg";

                case "min": return "min";

                case "max": return "max";

                case "count": return "count";

                case "consumption": return "consumption";

                case "rt": return "";      // RT = real-time, no aggregation

                case "none": return "";    // No aggregation

                default: return "";

            }

        }


        private double SafeToDouble(object value)

        {

            if (value == null || value == DBNull.Value)

                return 0.0;


            if (value is double d)

                return d;


            if (value is float f)

                return f;


            if (value is decimal dec)

                return (double)dec;


            if (value is int i)

                return i;


            if (value is long l)

                return l;


            if (double.TryParse(value.ToString(), NumberStyles.Any, CultureInfo.InvariantCulture, out double result))

                return result;


            return 0.0;

        }


        private string SafeToString(object value)

        {

            if (value == null || value == DBNull.Value)

                return "";


            return value.ToString();

        }


        #endregion

    }

}