I want to compress a large table containing historical data that is rarely or not read at all. At first I try to use the built-in compressions ( row , page , column stored , column-stored archive ), but none of them can compress values ββoutside the line ( varchar(max) , nvarchar(max) ) and finally, I try to use the CLR solution.
SQL Server Compressed Row Set Solution Compresses the entire row set returned by this query using a custom CLR type.
For example:
CREATE TABLE Archive ( [Date] DATETIME2 DEFAULT(GETUTCDATE()) ,[Data] [dbo].[CompressedRowset] ) INSERT INTO Archive([Data]) SELECT [dbo].[CompressQueryResults]('SELECT * FROM [dbo].[A]')
It works, but I met the following problems:
when I try to compress a large set of result lines, I get the following error :
Msg 0, level 11, state 0, line 0 Severe error on the current command. Results, if any, should be discarded.
In addition, the following statement works:
SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')
but they are not:
INSERT INTO Archive SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]' DECLARE @A [dbo].[CompressedRowset] SELECT @A = [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')
to compress a rowset, t-sql type must be mapped to .net type ; Unfortunately, this does not apply to all types of sql - Displaying CLR parameter data ; I have already expanded the following function to handle more types , but how to handle types like geography , for example :
static SqlDbType ToSqlType(Type t){ if (t == typeof(int)){ return SqlDbType.Int; } ... if (t == typeof(Byte[])){ return SqlDbType.VarBinary; } else { throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion"); } }
Here is the whole .net code:
using System; using System.Data; using System.Data.SqlClient; using System.Data.SqlTypes; using Microsoft.SqlServer.Server; using System.IO; using System.Runtime.Serialization.Formatters.Binary; using System.IO.Compression; using System.Xml.Serialization; using System.Xml; [Serializable] [Microsoft.SqlServer.Server.SqlUserDefinedType ( Format.UserDefined ,IsByteOrdered = false ,IsFixedLength = false ,MaxByteSize = -1 ) ] public struct CompressedRowset : INullable, IBinarySerialize, IXmlSerializable { DataTable rowset; public DataTable Data { get { return this.rowset; } set { this.rowset = value; } } public override string ToString() { using (var sw = new StringWriter()) using (var xw = new XmlTextWriter(sw)) { WriteXml(xw); xw.Flush(); sw.Flush(); return sw.ToString(); } } public bool IsNull { get { return (this.rowset == null);} } public static CompressedRowset Null { get { CompressedRowset h = new CompressedRowset(); return h; } } public static CompressedRowset Parse(SqlString s) { using (var sr = new StringReader(s.Value)) using (var xr = new XmlTextReader(sr)) { var c = new CompressedRowset(); c.ReadXml(xr); return c; } } #region "Stream Wrappers" abstract class WrapperStream : Stream { public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return false; } } public override void Flush() { } public override long Length { get { throw new NotImplementedException(); } } public override long Position { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); } public override void SetLength(long value) { throw new NotImplementedException(); } } class BinaryWriterStream : WrapperStream { BinaryWriter br; public BinaryWriterStream(BinaryWriter br) { this.br = br; } public override bool CanRead { get { return false; } } public override bool CanWrite { get { return true; } } public override int Read(byte[] buffer, int offset, int count) { throw new NotImplementedException(); } public override void Write(byte[] buffer, int offset, int count) { br.Write(buffer, offset, count); } } class BinaryReaderStream : WrapperStream { BinaryReader br; public BinaryReaderStream(BinaryReader br) { this.br = br; } public override bool CanRead { get { return true; } } public override bool CanWrite { get { return false; } } public override int Read(byte[] buffer, int offset, int count) { return br.Read(buffer, offset, count); } public override void Write(byte[] buffer, int offset, int count) { throw new NotImplementedException(); } } #endregion #region "IBinarySerialize" public void Read(System.IO.BinaryReader r) { using (var rs = new BinaryReaderStream(r)) using (var cs = new GZipStream(rs, CompressionMode.Decompress)) { var ser = new BinaryFormatter(); this.rowset = (DataTable)ser.Deserialize(cs); } } public void Write(System.IO.BinaryWriter w) { if (this.IsNull) return; rowset.RemotingFormat = SerializationFormat.Binary; var ser = new BinaryFormatter(); using (var binaryWriterStream = new BinaryWriterStream(w)) using (var compressionStream = new GZipStream(binaryWriterStream, CompressionMode.Compress)) { ser.Serialize(compressionStream, rowset); } } #endregion /// <summary> /// This procedure takes an arbitrary query, runs it and compresses the results into a varbinary(max) blob. /// If the query has a large result set, then this procedure will use a large amount of memory to buffer the results in /// a DataTable, and more to copy it into a compressed buffer to return. /// </summary> /// <param name="query"></param> /// <param name="results"></param> //[Microsoft.SqlServer.Server.SqlProcedure] [SqlFunction(DataAccess = DataAccessKind.Read, SystemDataAccess = SystemDataAccessKind.Read, IsDeterministic = false, IsPrecise = false)] public static CompressedRowset CompressQueryResults(string query) { //open a context connection using (var con = new SqlConnection("Context Connection=true")) { con.Open(); var cmd = new SqlCommand(query, con); var dt = new DataTable(); using (var rdr = cmd.ExecuteReader()) { dt.Load(rdr); } //configure the DataTable for binary serialization dt.RemotingFormat = SerializationFormat.Binary; var bf = new BinaryFormatter(); var cdt = new CompressedRowset(); cdt.rowset = dt; return cdt; } } /// <summary> /// partial Type mapping between SQL and .NET /// </summary> /// <param name="t"></param> /// <returns></returns> static SqlDbType ToSqlType(Type t) { if (t == typeof(int)) { return SqlDbType.Int; } if (t == typeof(string)) { return SqlDbType.NVarChar; } if (t == typeof(Boolean)) { return SqlDbType.Bit; } if (t == typeof(decimal)) { return SqlDbType.Decimal; } if (t == typeof(float)) { return SqlDbType.Real; } if (t == typeof(double)) { return SqlDbType.Float; } if (t == typeof(DateTime)) { return SqlDbType.DateTime; } if (t == typeof(Int64)) { return SqlDbType.BigInt; } if (t == typeof(Int16)) { return SqlDbType.SmallInt; } if (t == typeof(byte)) { return SqlDbType.TinyInt; } if ( t == typeof(Guid)) { return SqlDbType.UniqueIdentifier; } //!!!!!!!!!!!!!!!!!!! if (t == typeof(Byte[])) { return SqlDbType.VarBinary; } else { throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion"); } } /// <summary> /// This stored procedure takes a compressed DataTable and returns it as a resultset to the clinet /// or into a table using exec .... into ... /// </summary> /// <param name="results"></param> [Microsoft.SqlServer.Server.SqlProcedure] public static void UnCompressRowset(CompressedRowset results) { if (results.IsNull) return; DataTable dt = results.rowset; var fields = new SqlMetaData[dt.Columns.Count]; for (int i = 0; i < dt.Columns.Count; i++) { var col = dt.Columns[i]; var sqlType = ToSqlType(col.DataType); var colName = col.ColumnName; if (sqlType == SqlDbType.NVarChar || sqlType == SqlDbType.VarBinary) { fields[i] = new SqlMetaData(colName, sqlType, col.MaxLength); } else { fields[i] = new SqlMetaData(colName, sqlType); } } var record = new SqlDataRecord(fields); SqlContext.Pipe.SendResultsStart(record); foreach (DataRow row in dt.Rows) { record.SetValues(row.ItemArray); SqlContext.Pipe.SendResultsRow(record); } SqlContext.Pipe.SendResultsEnd(); } public System.Xml.Schema.XmlSchema GetSchema() { return null; } public void ReadXml(System.Xml.XmlReader reader) { if (rowset != null) { throw new InvalidOperationException("rowset already read"); } var ser = new XmlSerializer(typeof(DataTable)); rowset = (DataTable)ser.Deserialize(reader); } public void WriteXml(System.Xml.XmlWriter writer) { if (String.IsNullOrEmpty(rowset.TableName)) rowset.TableName = "Rows"; var ser = new XmlSerializer(typeof(DataTable)); ser.Serialize(writer, rowset); } }
and here is the creation of the SQL objects:
CREATE TYPE [dbo].[CompressedRowset] EXTERNAL NAME [CompressedRowset].[CompressedRowset]; GO CREATE FUNCTION [dbo].[CompressQueryResults] (@query [nvarchar](4000)) RETURNS [dbo].[CompressedRowset] AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[CompressQueryResults]; GO CREATE PROCEDURE [dbo].[UnCompressRowset] @results [dbo].[CompressedRowset] AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[UnCompressRowset]; GO