Build Your First DataBase Plugin
Build Your First Database Plugin
This is a sample historical database plugin for Stream SCADA that stores time-series data in PostgreSQL.
1. We'll use Visual Studio 2022 (or 2019) for this purpose.

2. The following NuGet package is required.
Name: Npgsql
Version: 4.0.17

3. C# code:
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Npgsql;
using Stream.Common.Shared;
namespace PostgreSqlHistoricalPlugin
{
/// <summary>
/// PostgreSQL Historical Database Plugin
/// Stores historical tag data in a PostgreSQL time-series database
/// </summary>
public class PostgreSqlHistoricalPlugin : HistoricalDatabasePluginBase
{
private string _connectionString;
private string _tableName = "datadb";
private bool _isReady;
public override string DisplayName => "PostgreSQL Historical Plugin";
public override string Version => "1.0.0";
// Plugin Settings (configurable in UI)
[DefaultValue("localhost")]
[Description("PostgreSQL server hostname or IP address")]
public string Host { get; set; } = "localhost";
[DefaultValue(5432)]
[Description("PostgreSQL server port number")]
public int Port { get; set; } = 5432;
[DefaultValue("streamscada")]
[Description("Name of the PostgreSQL database")]
public string DatabaseName { get; set; } = "streamscada";
[DefaultValue("postgres")]
[Description("PostgreSQL username")]
public string Username { get; set; } = "postgres";
[DefaultValue("")]
[Description("PostgreSQL password")]
public string Password { get; set; } = "";
[DefaultValue(true)]
[Description("Use SSL connection to PostgreSQL server")]
public bool UseSSL { get; set; } = true;
[DefaultValue(30)]
[Description("Command timeout in seconds")]
public int CommandTimeout { get; set; } = 30;
public override async Task Initialize()
{
try
{
// Build connection string
var builder = new NpgsqlConnectionStringBuilder
{
Host = Host,
Port = Port,
Database = DatabaseName,
Username = Username,
Password = Password,
Timeout = CommandTimeout,
CommandTimeout = CommandTimeout
};
// SSL Mode
if (UseSSL)
{
builder.SslMode = SslMode.Prefer;
}
else
{
builder.SslMode = SslMode.Disable;
}
_connectionString = builder.ToString();
// Test connection
using (var conn = new NpgsqlConnection(_connectionString))
{
await conn.OpenAsync();
}
_isReady = true;
await Task.CompletedTask;
}
catch (Exception ex)
{
_isReady = false;
throw new Exception($"PostgreSQL Plugin Initialize failed: {ex.Message}", ex);
}
}
public override async Task InsertTVSDataAsync_transPeriodicCompact(List<LogStruct> lstLogStruct)
{
if (lstLogStruct == null || lstLogStruct.Count == 0)
return;
try
{
// Lazy initialization on first write
if (!_isReady)
{
await Initialize();
}
if (!_isReady)
return;
// Ensure table exists with correct schema
await CreateTableIfNotExist_Compact(lstLogStruct);
using (var conn = new NpgsqlConnection(_connectionString))
{
await conn.OpenAsync();
using (var transaction = conn.BeginTransaction())
{
foreach (var logStruct in lstLogStruct)
{
if (logStruct.lstTVS == null || logStruct.lstTVS.Count == 0)
continue;
using (var cmd = conn.CreateCommand())
{
// Build INSERT statement dynamically based on tags
var columnNames = string.Join(", ", logStruct.lstTVS.Select(tvs => $"\"{tvs.TagName}\""));
var paramNames = string.Join(", ", logStruct.lstTVS.Select((tvs, idx) => $"@param{idx}"));
cmd.CommandText = $"INSERT INTO {_tableName} (timestamp, {columnNames}) VALUES (@TimeStamp, {paramNames})";
cmd.Parameters.AddWithValue("@TimeStamp", DateTime.Parse(logStruct.TimeStamp));
int paramIdx = 0;
foreach (var tvs in logStruct.lstTVS)
{
var value = tvs.ScaledValue ?? tvs.RawValue;
cmd.Parameters.AddWithValue($"@param{paramIdx}", value ?? (object)DBNull.Value);
paramIdx++;
}
await cmd.ExecuteNonQueryAsync();
}
}
await transaction.CommitAsync();
}
}
}
catch (Exception ex)
{
throw new Exception($"InsertTVSDataAsync_transPeriodicCompact failed: {ex.Message}", ex);
}
}
public override async Task InsertTVSDataAsync_trans2_Detailed(List<LogStruct> lstLogStruct)
{
if (lstLogStruct == null || lstLogStruct.Count == 0)
return;
try
{
// Lazy initialization on first write
if (!_isReady)
{
await Initialize();
}
if (!_isReady)
return;
// Ensure table exists
await CreateTableIfNotExist_Detailed();
using (var conn = new NpgsqlConnection(_connectionString))
{
await conn.OpenAsync();
using (var transaction = conn.BeginTransaction())
{
foreach (var logStruct in lstLogStruct)
{
if (logStruct.lstTVS == null || logStruct.lstTVS.Count == 0)
continue;
foreach (var tvs in logStruct.lstTVS)
{
using (var cmd = conn.CreateCommand())
{
cmd.CommandText = $@"INSERT INTO {_tableName}
(timestamp, tagname, rawvalue, scaledvalue, status, comstatus)
VALUES (@TimeStamp, @TagName, @RawValue, @ScaledValue, @Status, @ComStatus)";
cmd.Parameters.AddWithValue("@TimeStamp", DateTime.Parse(logStruct.TimeStamp));
cmd.Parameters.AddWithValue("@TagName", tvs.TagName);
cmd.Parameters.AddWithValue("@RawValue", tvs.RawValue ?? (object)DBNull.Value);
cmd.Parameters.AddWithValue("@ScaledValue", tvs.ScaledValue ?? (object)DBNull.Value);
cmd.Parameters.AddWithValue("@Status", tvs.Status ?? "");
cmd.Parameters.AddWithValue("@ComStatus", tvs.ComStatus ?? "");
await cmd.ExecuteNonQueryAsync();
}
}
}
await transaction.CommitAsync();
}
}
}
catch (Exception ex)
{
throw new Exception($"InsertTVSDataAsync_trans2_Detailed failed: {ex.Message}", ex);
}
}
public override async Task<FnResponse<List<TrendData>>> GetHistoricalTagValuesAsync_Detailed(
string tagName, string startDate, string endDate, string aggregation, string groupBy)
{
var response = new FnResponse<List<TrendData>>();
try
{
// No initialization needed for reads - database must already exist
var trendData = new List<TrendData>();
using (var conn = new NpgsqlConnection(_connectionString))
{
await conn.OpenAsync();
string dateGroupBy = GetPostgresGroupByString(groupBy);
string aggKey = GetAggregation(aggregation);
string query;
// Match the pattern from DataDBSqlite
if (aggKey != "consumption")
{
// For RT mode (aggKey = ""), the query becomes: SELECT tagname, timestamp, (scaledvalue) As value
// For aggregation modes: SELECT tagname, timestamp, avg(scaledvalue) As value
query = $@"SELECT tagname, timestamp, {aggKey}(scaledvalue::numeric) As value
FROM {_tableName}
WHERE LOWER(tagname) = LOWER(@TagName)
AND timestamp BETWEEN @StartDate AND @EndDate
GROUP BY date_trunc('{dateGroupBy}', timestamp), tagname
ORDER BY timestamp";
}
else
{
// Consumption = max - min
query = $@"SELECT tagname, timestamp,
max(scaledvalue::numeric) - min(scaledvalue::numeric) As value
FROM {_tableName}
WHERE LOWER(tagname) = LOWER(@TagName)
AND timestamp BETWEEN @StartDate AND @EndDate
GROUP BY date_trunc('{dateGroupBy}', timestamp), tagname
ORDER BY timestamp";
}
using (var cmd = new NpgsqlCommand(query, conn))
{
cmd.Parameters.AddWithValue("@TagName", tagName);
cmd.Parameters.AddWithValue("@StartDate", DateTime.Parse(startDate));
cmd.Parameters.AddWithValue("@EndDate", DateTime.Parse(endDate));
using (var reader = await cmd.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
{
DateTime time = reader.GetDateTime(reader.GetOrdinal("timestamp"));
double value = SafeToDouble(reader["value"]);
trendData.Add(new TrendData(time, value));
}
}
}
}
response.IsOK = true;
response.Data = trendData;
}
catch (Exception ex)
{
response.IsOK = false;
response.ErrorMsg = $"GetHistoricalTagValuesAsync_Detailed failed: {ex.Message}";
}
return response;
}
public override async Task<FnResponse<List<TrendDataBio>>> GetHistoricalTagValuesAsync_Compact(
string startDate, string endDate, string aggregation, string groupBy)
{
var response = new FnResponse<List<TrendDataBio>>();
try
{
// No initialization needed for reads
var trendData = new List<TrendDataBio>();
using (var conn = new NpgsqlConnection(_connectionString))
{
await conn.OpenAsync();
string dateGroupBy = GetPostgresGroupByString(groupBy);
// Get all columns except timestamp
var columns = await GetTableColumns(conn);
var dataColumns = columns.Where(c => c.ToLower() != "timestamp").ToList();
if (dataColumns.Count == 0)
{
response.IsOK = true;
response.Data = trendData;
return response;
}
// Build query: SELECT * FROM table GROUP BY date_trunc(groupBy, timestamp)
var query = $@"SELECT * FROM {_tableName}
WHERE timestamp BETWEEN @StartDate AND @EndDate
GROUP BY date_trunc('{dateGroupBy}', timestamp), timestamp
ORDER BY timestamp";
using (var cmd = new NpgsqlCommand(query, conn))
{
cmd.Parameters.AddWithValue("@StartDate", DateTime.Parse(startDate));
cmd.Parameters.AddWithValue("@EndDate", DateTime.Parse(endDate));
using (var reader = await cmd.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
{
DateTime time = reader.GetDateTime(reader.GetOrdinal("timestamp"));
var lstTV = new List<TagValueStatusSlim>();
foreach (var col in dataColumns)
{
var tagValue = new TagValueStatusSlim
{
TagName = col,
Value = SafeToString(reader[col])
};
lstTV.Add(tagValue);
}
// TrendDataBio constructor: Sub New(TagName As String, lstTV As List(Of TagValueStatusSlim))
// Note: First parameter is TagName (not used in compact mode, pass empty string)
var bioData = new TrendDataBio("", lstTV);
bioData.Time = time; // Set the time property after construction
trendData.Add(bioData);
}
}
}
}
response.IsOK = true;
response.Data = trendData;
}
catch (Exception ex)
{
response.IsOK = false;
response.ErrorMsg = $"GetHistoricalTagValuesAsync_Compact failed: {ex.Message}";
}
return response;
}
public override async Task<FnResponse<List<TrendDataBio>>> GetLastRecordAsync_Compact()
{
var response = new FnResponse<List<TrendDataBio>>();
try
{
var trendData = new List<TrendDataBio>();
using (var conn = new NpgsqlConnection(_connectionString))
{
await conn.OpenAsync();
// Get all columns except timestamp
var columns = await GetTableColumns(conn);
var dataColumns = columns.Where(c => c.ToLower() != "timestamp").ToList();
if (dataColumns.Count == 0)
{
response.IsOK = true;
response.Data = trendData;
return response;
}
var query = $"SELECT * FROM {_tableName} ORDER BY timestamp DESC LIMIT 1";
using (var cmd = new NpgsqlCommand(query, conn))
{
using (var reader = await cmd.ExecuteReaderAsync())
{
if (await reader.ReadAsync())
{
DateTime time = reader.GetDateTime(reader.GetOrdinal("timestamp"));
var lstTV = new List<TagValueStatusSlim>();
foreach (var col in dataColumns)
{
var tagValue = new TagValueStatusSlim
{
TagName = col,
Value = SafeToString(reader[col])
};
lstTV.Add(tagValue);
}
var bioData = new TrendDataBio("", lstTV);
bioData.Time = time; // Set the time property after construction
trendData.Add(bioData);
}
}
}
}
response.IsOK = true;
response.Data = trendData;
}
catch (Exception ex)
{
response.IsOK = false;
response.ErrorMsg = $"GetLastRecordAsync_Compact failed: {ex.Message}";
}
return response;
}
public override async Task CheckToDeleteOldRecords(int numDays)
{
try
{
using (var conn = new NpgsqlConnection(_connectionString))
{
await conn.OpenAsync();
var cutoffDate = DateTime.Now.AddDays(-numDays);
var query = $"DELETE FROM {_tableName} WHERE timestamp < @CutoffDate";
using (var cmd = new NpgsqlCommand(query, conn))
{
cmd.Parameters.AddWithValue("@CutoffDate", cutoffDate);
await cmd.ExecuteNonQueryAsync();
}
}
}
catch (Exception ex)
{
throw new Exception($"CheckToDeleteOldRecords failed: {ex.Message}", ex);
}
}
public override void Disconnect()
{
_isReady = false;
// PostgreSQL connection pooling handles cleanup automatically
}
public override DataTable GetGroupedTagDataAsDt(
string filePath, string tagName, string startDate, string endDate, string aggregation, string groupBy)
{
var dataTable = new DataTable();
dataTable.Columns.Add("Time", typeof(string));
dataTable.Columns.Add("Value", typeof(double));
try
{
if (!_isReady)
return dataTable;
using (var conn = new NpgsqlConnection(_connectionString))
{
conn.Open();
string dateGroupBy = GetPostgresGroupByString(groupBy);
string aggKey = GetAggregation(aggregation);
string query = $@"
SELECT date_trunc('{dateGroupBy}', timestamp) as GroupedTime,
{aggKey}(scaledvalue::numeric) as AggValue
FROM {_tableName}
WHERE LOWER(tagname) = LOWER(@TagName)
AND timestamp BETWEEN @StartDate AND @EndDate
GROUP BY GroupedTime
ORDER BY GroupedTime";
using (var cmd = new NpgsqlCommand(query, conn))
{
cmd.Parameters.AddWithValue("@TagName", tagName);
cmd.Parameters.AddWithValue("@StartDate", DateTime.Parse(startDate));
cmd.Parameters.AddWithValue("@EndDate", DateTime.Parse(endDate));
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
// PostgreSQL returns timestamp as DateTime object
var groupedTime = reader["GroupedTime"];
string timeStr = groupedTime is DateTime dtValue
? dtValue.ToString("yyyy-MM-dd HH:mm:ss")
: groupedTime.ToString();
double value = SafeToDouble(reader["AggValue"]);
dataTable.Rows.Add(timeStr, value);
}
}
}
}
}
catch (Exception ex)
{
// Log error but return empty DataTable
Console.WriteLine($"GetGroupedTagDataAsDt error: {ex.Message}");
}
return dataTable;
}
public override FnResponse<string> ValidateConfiguration()
{
if (string.IsNullOrWhiteSpace(Host))
{
return new FnResponse<string>
{
IsOK = false,
ErrorMsg = "Host cannot be empty",
Data = null
};
}
if (Port <= 0 || Port > 65535)
{
return new FnResponse<string>
{
IsOK = false,
ErrorMsg = "Port must be between 1 and 65535",
Data = null
};
}
if (string.IsNullOrWhiteSpace(DatabaseName))
{
return new FnResponse<string>
{
IsOK = false,
ErrorMsg = "DatabaseName cannot be empty",
Data = null
};
}
if (string.IsNullOrWhiteSpace(Username))
{
return new FnResponse<string>
{
IsOK = false,
ErrorMsg = "Username cannot be empty",
Data = null
};
}
return new FnResponse<string>
{
IsOK = true,
ErrorMsg = "Configuration is valid",
Data = "OK"
};
}
#region Helper Methods
private async Task CreateTableIfNotExist_Detailed()
{
try
{
using (var conn = new NpgsqlConnection(_connectionString))
{
await conn.OpenAsync();
var createTableSql = $@"
CREATE TABLE IF NOT EXISTS {_tableName} (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP NOT NULL,
tagname VARCHAR(100) NOT NULL,
rawvalue VARCHAR(50),
scaledvalue VARCHAR(50),
status VARCHAR(20),
comstatus VARCHAR(20)
);
CREATE INDEX IF NOT EXISTS idx_{_tableName}_timestamp ON {_tableName}(timestamp);
CREATE INDEX IF NOT EXISTS idx_{_tableName}_tagname ON {_tableName}(tagname);
";
using (var cmd = new NpgsqlCommand(createTableSql, conn))
{
await cmd.ExecuteNonQueryAsync();
}
}
}
catch (Exception ex)
{
throw new Exception($"CreateTableIfNotExist_Detailed failed: {ex.Message}", ex);
}
}
private async Task CreateTableIfNotExist_Compact(List<LogStruct> lstLogStruct)
{
try
{
if (lstLogStruct == null || lstLogStruct.Count == 0)
return;
var firstLogStruct = lstLogStruct.FirstOrDefault(ls => ls.lstTVS != null && ls.lstTVS.Count > 0);
if (firstLogStruct == null)
return;
using (var conn = new NpgsqlConnection(_connectionString))
{
await conn.OpenAsync();
// Build column definitions
var columnDefs = new StringBuilder();
columnDefs.Append("id SERIAL PRIMARY KEY, timestamp TIMESTAMP NOT NULL");
foreach (var tvs in firstLogStruct.lstTVS)
{
columnDefs.Append($", \"{tvs.TagName}\" VARCHAR(50)");
}
var createTableSql = $@"
CREATE TABLE IF NOT EXISTS {_tableName} ({columnDefs});
CREATE INDEX IF NOT EXISTS idx_{_tableName}_timestamp ON {_tableName}(timestamp);
";
using (var cmd = new NpgsqlCommand(createTableSql, conn))
{
await cmd.ExecuteNonQueryAsync();
}
}
}
catch (Exception ex)
{
throw new Exception($"CreateTableIfNotExist_Compact failed: {ex.Message}", ex);
}
}
private async Task<List<string>> GetTableColumns(NpgsqlConnection conn)
{
var columns = new List<string>();
try
{
var query = $@"
SELECT column_name
FROM information_schema.columns
WHERE table_name = '{_tableName}'
ORDER BY ordinal_position";
using (var cmd = new NpgsqlCommand(query, conn))
{
using (var reader = await cmd.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
{
columns.Add(reader.GetString(0));
}
}
}
}
catch (Exception ex)
{
// Log error but return empty list to allow operation to continue
Console.WriteLine($"GetTableColumns error: {ex.Message}");
}
return columns;
}
private string GetPostgresGroupByString(string groupBy)
{
// PostgreSQL date_trunc function parameters
switch (groupBy?.ToLower())
{
case "ms":
case "millisecond":
return "millisecond";
case "second":
return "second";
case "minute":
return "minute";
case "hour":
return "hour";
case "day":
return "day";
case "month":
return "month";
case "year":
return "year";
default:
return "second";
}
}
private string GetAggregation(string aggregation)
{
switch (aggregation?.ToLower())
{
case "sum": return "sum";
case "avg": return "avg";
case "min": return "min";
case "max": return "max";
case "count": return "count";
case "consumption": return "consumption";
case "rt": return ""; // RT = real-time, no aggregation
case "none": return ""; // No aggregation
default: return "";
}
}
private double SafeToDouble(object value)
{
if (value == null || value == DBNull.Value)
return 0.0;
if (value is double d)
return d;
if (value is float f)
return f;
if (value is decimal dec)
return (double)dec;
if (value is int i)
return i;
if (value is long l)
return l;
if (double.TryParse(value.ToString(), NumberStyles.Any, CultureInfo.InvariantCulture, out double result))
return result;
return 0.0;
}
private string SafeToString(object value)
{
if (value == null || value == DBNull.Value)
return "";
return value.ToString();
}
#endregion
}
}