Daten von einem Programm in SQL streamen

 » Startseite » netFrame als Plattform Menü » Tipps und Tricks » Daten von einem Programm in SQL streamen | » Erweiterte Suche

  1. Prelude to a Test
  2. The Typical Method
  3. The Newer And Awesomer Method
  4. The Double-Plus Good Method
  5. Test Overview





    ==Prelude to a Test==
    There are many new and coolio features in Microsoft SQL Server 2008 (such as FINALLY being able to initialize a variable when you declare it) and the term "streaming" has been thrown around quite a bit. Nearly all of the articles that include the word "streaming" are referring to the FILESTREAM feature of storing BLOB data on the filesystem. This article, however, is NOT about the FILESTREAM feature (which is fine because there are plenty of those already). Instead this article is about using Table-Valued Parameters (TVPs) to stream data from an application into the database.


    Many articles have been written to cover the basic topic of what Table-Valued Parameters are and how to use them to pass data between Stored Procedures so I will not go into that here. What I am going to focus on is using TVPs to pass data from an application to the database. Now there are two ways of passing data to a TVP: sending the data all at once and streaming the data as you receive it. The first way sending all of the data at once is a topic that has been covered in a few articles but I will include it here for completeness and to compare it with the fully-streamed method. The streaming method is mentioned in a few places but I have not been able to find any articles about it, most likely due to it not being applicable as often as sending all of the data at once. In fact, the only example of it that I have seen and which I used as a reference for this article was a session at the SQL PASS 2008 conference presented by Himanshu Vasishth (see References section for details).





    Nach oben





    ==The Typical Method==
    Prior to SQL Server 2008 the only way to send data to the database was to send it in one row at a time. So we have all created the basic Insert, Update, and Delete Stored Procedures that accept a single row's data to work on. Of course for the Delete operations some people pass in a comma-separated list and split it out in the database using a User-Defined Function, but that doesn't work for Insert and Update. If there is only one row to operate on then there really is no issue of efficiency. But often enough we have a set of data that we want to work with at one time. Whether we are creating a matching set of data (e.g. creating a single Order Header record and several Order Detail records associated with it) or loading a set of data (e.g. data import or maybe persisting a .Net Grid View), we are forced to iterate over that set of data. And in the case of the Header / Detail situation, you have to create the Header record first and then create the Detail records based on the ID of the Header record. All of these calls add up in terms of network traffic and the length of time of the operation contributes to blocking if a transaction was opened by the application; if a transaction was not opened by the application then each statement is its own transaction which is a greater load on the server as compared to bulk / multi-statement operations. The only benefit here is that if you are importing a large dataset from disk or another source you can load each row into memory as you need it and hence consume a low amount of memory. But this benefit doesn't provide any efficiency in terms of time that it takes to perform the operation.





    Nach oben





    ==The Newer And Awesomer Method==
    Starting in SQL Server 2008 is the ability to pass real datasets to Stored Procedures using Table-Valued Parameters (TVPs). TVPs allow you to pass in a strongly-typed collection of data (i.e. a table). So now we can make a single call to the database to pass in all relevant records. This cuts down on network traffic and allows us to both combine operations such as the Order Header and Order Details into a single call as well as making use of multi-row DML operations which are much more efficient than iterative statements. In most cases this method will consume the same amount of memory as the iterative method since you will likely have the data collected to send to the database at that moment. The difference here is that you can make a single call to the database to send it all in which is both a cleaning programming model as well as an order of magnitude faster, especially as you work with larger sets of data. The only down-side here is that if you are importing a large dataset from disk or another source you do not have the ability to read that data in as you pass it to the database and hence an operation might consume too much memory before you get a chance to send it all at once to the database.





    Nach oben





    ==The Double-Plus Good Method==
    So the feature that is either not mentioned or is only alluded to but never shown (same as with this article so far ;-)) is the ability to not only send all of the data at once to a TVP (and gain the efficiency of a single network call with a multi-statement operation) but to also keep the memory footprint low. That's right, by doing a little extra work (not too much) you can stream an entire set of data to SQL Server 2008 as it is being read into your application. To be fair, this method does not really offer any benefit over the standard TVP approach (using a DataTable) if you already have all of the data in memory and there is no way to get it in small chunks. However, for those operations that require loading large sets of data that are being read from disk, a Web Service call, a query result from another database, etc., this method allows you to start passing the data to SQL Server as you receive it. This allows you to get the low-memory benefit of the iterative method plus the network and database efficiency benefit of the basic TVP method.



    But wait, there's more! Well, no there isn't. But now we can get to the technical details (i.e. the fun stuff).





    Nach oben





    ==Test Overview==
    The example code that is shown below (and attached to this article at the bottom in the "Resources" section) is a single C# file (a Console Application) that covers four different methods:


  6. The "old school" (option = old) method of importing a text file iteratively by calling a Stored Procedure that accepts parameters for each of the columns,
  7. The "standard TVP" (option = normal) method of importing the text file entirely and then calling the TVP-based Stored Procedure once to Insert the data by passing in a DataTable,
  8. The "half-streaming" (option = half) method of reading all of the data into memory (same as the "standard TVP" method) but then calling the TVP-based Stored Procedure once while streaming the data from the DataTable using IEnumerable, and
  9. The "full-streaming" (option = full) method of reading the data one record at a time from the text file and streaming each record to the TVP-based Stored Procedure in a single call by using IEnumerable


    I have also attached the compiled .exe file of the code so that you don't need to compile this yourself if you want to run the same test. Just rename the file to end in .exe instead of .ex_ so that it runs.



    Please note that I fully realize that the example code below is not 100% optimized in that there are a few places where I repeat blocks of code. This is by design since the purpose of this code is to be educational and I wanted to make it clear for people to see each of the four methods and the code required for each method without confusing the issue for people who might not be used to reading C#.








    // Copyright (c) April 2009 by Solomon Rutzky
    // this script goes along with the article "Streaming Data Into SQL Server 2008 From an Application"
    // that is featured on http://www.SQLServerCentral.com/Authors/Articles/Solomon_Rutzky/294002/

// Usage: SQLServer2008_StreamingTest { Normal | Half | Full | Old } "FilePath"

using System; using System.Collections.Generic; // general IEnumerable and IEnumerator using System.Data.SqlClient; // SqlConnection, SqlCommand, and SqlParameter using System.Data; // DataTable and SqlDataType using System.IO; // StreamReader using System.Collections; // IEnumerator and IEnumerable using Microsoft.SqlServer.Server; // SqlDataRecord and SqlMetaData

namespace SQLServer2008_StreamingTest { class Program { static int Main(string[] args) { bool error = false; int returnCode = 0;

if (args.Length < 2) { error = true; returnCode = 1; } else {

switch (args0.ToLower()) { case "normal": TestSetup(); returnCode = NormalImport(args1); break; case "half": TestSetup(); returnCode = HalfStreamedImport(args1); break; case "full": TestSetup(); returnCode = FullStreamedImport(args1); break; case "old": TestSetup(); returnCode = OldSchoolImport(args1); break; default: error = true; returnCode = 2; break; } }

Console.WriteLine("{0} Done!", DateTime.Now);

if (error) { Console.WriteLine("Usage: SQLServer2008_StreamingTest { Normal | Half | Full | Old } \"FilePath\""); }

return returnCode; }

static void TestSetup() { SqlConnection dbConnection = null;

try { using (dbConnection = new SqlConnection("Server=localhost;Database=Test;Trusted_Connection=True;")) { dbConnection.Open();

SqlCommand importProc = new SqlCommand("CHECKPOINT ; TRUNCATE TABLE dbo.ImportTest ; DBCC FREEPROCCACHE ; DBCC FREESYSTEMCACHE('ALL') ; CHECKPOINT ;", dbConnection); importProc.CommandType = CommandType.Text;

Console.WriteLine("{0}
Truncating the import table, clearing the caches, etc...", DateTime.Now); importProc.ExecuteNonQuery(); Console.WriteLine("{0} Done setting up for the test", DateTime.Now); } } catch (Exception exception) { Console.WriteLine("Error: {0}", exception.Message); } finally { dbConnection.Close(); }

return; }

static int OldSchoolImport(string FilePath) { int returnCode = 0; DateTime startTime; DataTable tempTable = new DataTable(); double elapsedSeconds; StreamReader fileReader = null; SqlConnection dbConnection = null;

tempTable.Columns.Add("SillyIDField", typeof(Int32)); tempTable.Columns.Add("MeaninglessText", typeof(string));

try { Console.WriteLine("{0}
Starting to read data file...", DateTime.Now); using (fileReader = new StreamReader(FilePath)) { string inputRow = ""; string">2"> inputColumns = new string[2;

while (!fileReader.EndOfStream) { inputRow = fileReader.ReadLine(); inputColumns = inputRow.Split('\t');

//Console.WriteLine("Field1 = {0} ; Field2 = {1}", inputColumns0, inputColumns1); tempTable.LoadDataRow(inputColumns, false); } } Console.WriteLine("{0} Done reading data file; {1} records read into memory", DateTime.Now, tempTable.Rows.Count); } catch (Exception exception) { Console.WriteLine("Error: {0}", exception.Message); returnCode = 3; } finally { fileReader.Close(); }

try { startTime = DateTime.Now;

Console.WriteLine("{0}
Connecting to the Database...", DateTime.Now); using (dbConnection = new SqlConnection("Server=localhost;Database=Test;Trusted_Connection=True;")) { dbConnection.Open();

Console.WriteLine("{0} Calling the proc iteratively ...", DateTime.Now); for (int rowNum = 0; rowNum < tempTable.Rows.Count; rowNum++) { SqlCommand importProc = new SqlCommand("ImportDataOldSchool", dbConnection); importProc.CommandType = CommandType.StoredProcedure; SqlParameter sillyIDField = importProc.Parameters.AddWithValue("@SillyIDField", (int)tempTable.RowsrowNum.ItemArray.GetValue(0)); sillyIDField.SqlDbType = SqlDbType.Int; SqlParameter meaninglessText = importProc.Parameters.AddWithValue("@MeaninglessText", (string)tempTable.RowsrowNum.ItemArray.GetValue(1)); meaninglessText.SqlDbType = SqlDbType.VarChar;

importProc.ExecuteNonQuery(); } }

elapsedSeconds = ((Convert.ToDouble(DateTime.Now.Ticks - startTime.Ticks)) / 10000000.0D); Console.WriteLine("{0}
Done importing the data old-school style ({1} Seconds)", DateTime.Now, elapsedSeconds); } catch (Exception exception) { Console.WriteLine("Error: {0}", exception.Message); returnCode = 4; } finally { dbConnection.Close(); }

return returnCode; }

static int NormalImport(string FilePath) { int returnCode = 0; DateTime startTime; DataTable tempTable = new DataTable(); double elapsedSeconds; StreamReader fileReader = null; SqlConnection dbConnection = null;

tempTable.Columns.Add("SillyIDField", typeof(Int32)); tempTable.Columns.Add("MeaninglessText", typeof(string));

try { Console.WriteLine("{0} Starting to read data file...", DateTime.Now); using (fileReader = new StreamReader(FilePath)) { string inputRow = ""; string">2"> inputColumns = new string[2;

while (!fileReader.EndOfStream) { inputRow = fileReader.ReadLine(); inputColumns = inputRow.Split('\t');

//Console.WriteLine("Field1 = {0} ; Field2 = {1}", inputColumns0, inputColumns1); tempTable.LoadDataRow(inputColumns, false); } } Console.WriteLine("{0}
Done reading data file; {1} records read into memory", DateTime.Now, tempTable.Rows.Count); } catch (Exception exception) { Console.WriteLine("Error: {0}", exception.Message); returnCode = 5; } finally { fileReader.Close(); }

try { startTime = DateTime.Now; Console.WriteLine("{0} Connecting to the Database...", DateTime.Now);

using (dbConnection = new SqlConnection("Server=localhost;Database=Test;Trusted_Connection=True;")) { dbConnection.Open();

SqlCommand importProc = new SqlCommand("ImportDataTVP", dbConnection); importProc.CommandType = CommandType.StoredProcedure; importProc.CommandTimeout = 300; SqlParameter importTable = importProc.Parameters.AddWithValue("@ImportTable", tempTable); importTable.TypeName = "dbo.ImportStructure"; importTable.SqlDbType = SqlDbType.Structured;

Console.WriteLine("{0}
Calling proc to import the data...", DateTime.Now); importProc.ExecuteNonQuery(); elapsedSeconds = ((Convert.ToDouble(DateTime.Now.Ticks - startTime.Ticks)) / 10000000.0D); Console.WriteLine("{0} Proc is done importing the data via single TVP ({1} Seconds)", DateTime.Now, elapsedSeconds); } } catch (Exception exception) { Console.WriteLine("Error: {0}", exception.Message); returnCode = 6; } finally { dbConnection.Close(); }

return returnCode; }

static int HalfStreamedImport(string FilePath) { int returnCode = 0; DateTime startTime; DataTable tempTable = new DataTable(); double elapsedSeconds; StreamReader fileReader = null; SqlConnection dbConnection = null;

tempTable.Columns.Add("SillyIDField", typeof(Int32)); tempTable.Columns.Add("MeaninglessText", typeof(string));

try { Console.WriteLine("{0}
Starting to read data file...", DateTime.Now); using (fileReader = new StreamReader(FilePath)) { string inputRow = ""; string">2"> inputColumns = new string[2;

while (!fileReader.EndOfStream) { inputRow = fileReader.ReadLine(); inputColumns = inputRow.Split('\t');

//Console.WriteLine("Field1 = {0} ; Field2 = {1}", inputColumns0, inputColumns1); tempTable.LoadDataRow(inputColumns, false); } } Console.WriteLine("{0} Done reading data file; {1} records read into memory", DateTime.Now, tempTable.Rows.Count); } catch (Exception exception) { Console.WriteLine("Error: {0}", exception.Message); returnCode = 5; } finally { fileReader.Close(); }

try { startTime = DateTime.Now;

Console.WriteLine("{0}
Connecting to the Database...", startTime); using (dbConnection = new SqlConnection("Server=localhost;Database=Test;Trusted_Connection=True;")) { dbConnection.Open();

SqlCommand importProc = new SqlCommand("ImportDataTVP", dbConnection); importProc.CommandType = CommandType.StoredProcedure; importProc.CommandTimeout = 300; SqlParameter importTable = new SqlParameter(); importTable.ParameterName = "@ImportTable"; importTable.TypeName = "dbo.ImportStructure"; importTable.SqlDbType = SqlDbType.Structured; importTable.Value = new HalfStreamingDataRecord(tempTable); importProc.Parameters.Add(importTable);

Console.WriteLine("{0} Calling proc to import the data...", DateTime.Now); importProc.ExecuteNonQuery(); elapsedSeconds = ((Convert.ToDouble(DateTime.Now.Ticks - startTime.Ticks)) / 10000000.0D); Console.WriteLine("{0} Proc is done importing the data via Streamed TVP ({1} Seconds)", DateTime.Now, elapsedSeconds); } } catch (Exception exception) { Console.WriteLine("Error: {0}", exception.Message); returnCode = 6; } finally { dbConnection.Close(); }

return returnCode; }

static int FullStreamedImport(string FilePath) { int returnCode = 0; DateTime startTime; double elapsedSeconds; SqlConnection dbConnection = null;

try { startTime = DateTime.Now;

Console.WriteLine("{0} Connecting to the Database...", startTime); using (dbConnection = new SqlConnection("Server=localhost;Database=Test;Trusted_Connection=True;")) { dbConnection.Open();

SqlCommand importProc = new SqlCommand("ImportDataTVP", dbConnection); importProc.CommandType = CommandType.StoredProcedure; importProc.CommandTimeout = 300; SqlParameter importTable = new SqlParameter(); importTable.ParameterName = "@ImportTable"; importTable.TypeName = "dbo.ImportStructure"; importTable.SqlDbType = SqlDbType.Structured; importTable.Value = new FullStreamingDataRecord(FilePath); importProc.Parameters.Add(importTable);

Console.WriteLine("{0}
Calling proc to read and import the data...", DateTime.Now); importProc.ExecuteNonQuery(); elapsedSeconds = ((Convert.ToDouble(DateTime.Now.Ticks - startTime.Ticks)) / 10000000.0D); Console.WriteLine("{0} -- Proc is done reading and importing the data via Streamed TVP ({1} Seconds))", DateTime.Now, elapsedSeconds); } } catch (Exception exception) { Console.WriteLine("Error: {0}", exception.Message); returnCode = 7; } finally { dbConnection.Close(); }

return returnCode; } }

public class HalfStreamingDataRecord : IEnumerable { DataTable tempTable;

public HalfStreamingDataRecord(DataTable TempTable) { tempTable = TempTable; }

public IEnumerator GetEnumerator() { SqlMetaData">2"> columnStructure = new SqlMetaData[2; columnStructure0 = new SqlMetaData("SillyIDField", SqlDbType.Int); columnStructure1 = new SqlMetaData("MeaninglessText", SqlDbType.VarChar, 50);

for (int rowNum = 0; rowNum < tempTable.Rows.Count; rowNum++) { SqlDataRecord dataRecord = new SqlDataRecord(columnStructure); dataRecord.SetInt32(0, (int)tempTable.RowsrowNum.ItemArray.GetValue(0)); dataRecord.SetString(1, (string)tempTable.RowsrowNum.ItemArray.GetValue(1)); yield return dataRecord; } }

IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } }

public class FullStreamingDataRecord : IEnumerable { string filePath;

public FullStreamingDataRecord(string FilePath) { filePath = FilePath; }

public IEnumerator GetEnumerator() { SqlMetaData">2"> columnStructure = new SqlMetaData[2; columnStructure0 = new SqlMetaData("SillyIDField", SqlDbType.Int); columnStructure1 = new SqlMetaData("MeaninglessText", SqlDbType.VarChar, 50); StreamReader fileReader = null;

try { using (fileReader = new StreamReader(filePath)) { string inputRow = ""; string">2"> inputColumns = new string[2;

while (!fileReader.EndOfStream) { inputRow = fileReader.ReadLine(); inputColumns = inputRow.Split('\t'); SqlDataRecord dataRecord = new SqlDataRecord(columnStructure); dataRecord.SetInt32(0, Int32.Parse(inputColumns0)); dataRecord.SetString(1, inputColumns1); yield return dataRecord; } } } // no catch block allowed due to the "yield" command finally { fileReader.Close(); } }

IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } }

}


Nach oben