DataFrames in Apache Spark - Java Spark API

In this article we will see what are DataFrames in Apache Spark, how to create them and their operations using Spark Java API. We will also look into how to create a DataFrame from different sources like RDD, Java Lits, JSON and MySql etc.

What is a Spark's DataFrame (Untyped Dataset) ?

DataFrames were added to Spark from "Spark 1.3", a DataFrame is a type of Dataset, organised into named columns conceptually equivalent to a table in a relational database.

DataFrame is represented by a Dataset of Rows (Dataset<Row>) and contains additional metadata due to its tabular format, which allows Spark to run certain optimizations on the finalised query.

Along with Dataframe, Spark also introduced catalyst optimizer, which leverages advanced programming features to build an extensible query optimizer.

What is SparkSession ?

SparkSession is added since spark 2.X as a replacement of SqlContext and HiveContext. Since 2.X SparkSession can be used for both structured data (Dataset and DataFrame) operations and hive related SQL operations.

A SparkSession object can be created as shown below:
SparkSession sparkSession = SparkSession.builder().getOrCreate();
A Hive Supported SparkSession object can be created as shown below:
SparkSession sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate();

How to create a DataFrame in Spark ?

DataFrames can be created using SparkSession from an existing RDD, Hive table, Databases (MySql) using JDBC drivers or from Spark data sources like ".parquet", ".json", ORC etc.

RDD to DataFrame in Spark

In Spark Java API, a DataFrame can be obtained from an existing JavaRDD as shown below:
	public Dataset<Row> dataFrame(JavaRDD<T> javaRdd, Class<T> clazz) {
		Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRdd, clazz);
		return dataFrame;

	}

Read JSON to DataFrame in Spark

In Spark Java API, a DataFrame can be obtained from a .json file as shown below:
	public Dataset<Row> dataFrame(String jsonFilePath) {
		Dataset<Row> dataFrame = sparkSession.read().json(jsonFilePath);
		return dataFrame;

	}

Read Apache Parquet to DataFrame in Spark

Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.

In Spark Java API, a DataFrame can be obtained from a .parquet file as shown below:
	public Dataset<Row> dataFrameParquet(String parquetFilePath) {
		Dataset<Row> dataFrame = sparkSession.read().load(parquetFilePath);
		return dataFrame;

	}

How to load a MySql table to DataFrame in Spark ?

In Spark Java API, a DataFrame can be obtained from MySql table or any other jdbc source as shown below:
	/*
	 * dbUrl - jdbc:mysql://127.0.0.0:3306/techdb
	 * tableNameWithSchema - techdb.articles
	 * userName - tech
	 * password - burps
	 * */
	public Dataset<Row> dataFrame(String dbUrl, String tableNameWithSchema, String userName, String password) {
		Dataset<Row> dataFrame = sparkSession.read()
				  .format("jdbc")
				  .option("url", dbUrl)
				  .option("dbtable", tableNameWithSchema)
				  .option("user", userName)
				  .option("password", password)
				  .load();
		return dataFrame;

	}

How to create a DataFrame from a Java List in Spark ?

In Spark Java API, a DataFrame can be obtained from a Java List as shown below:
	public Dataset<Row> dataFrame(List<T> pageViewsList, Class<T> clazz) {
		Dataset<Row> dataFrame = sparkSession.createDataFrame(pageViewsList, clazz);
		return dataFrame;
	}

Spark's DataFrame operations

Now, lets look into some basic operations that can be performed on DataFrames, we will use some dummy data for this purpose as shown below:
		/* Dummy data*/
		List<Geo> dummy = new ArrayList<Geo>();
		
		dummy.add(new Geo(1, "London", 20));
		dummy.add(new Geo(2, "New York", 30));
		dummy.add(new Geo(3, "Delhi", 40));
		dummy.add(new Geo(3, "Delhi", 60));
Lets create a DataFrame from this data with the help of SparkSession, we will use same DataFrame to perform different operations:
SparkSession sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate();
Dataset<Row> dataFrame = sparkSession.createDataFrame(dummy, Geo.class);

How to print DataFrame content on console

We can use .show() to displays the content of the DataFrame to stdout as shown below:
		/* Show complete data of dataframe*/
		dataFrame.show();

		/* Show specific number of rows of dataframe*/
		dataFrame.show(2);
Output: Following output will be shown to stdout:
+------+--------+----+
|cityId|cityName|hits|
+------+--------+----+
|     1|  London|  20|
|     2|New York|  30|
|     3|   Delhi|  40|
|     3|   Delhi|  60|
+------+--------+----+

+------+--------+----+
|cityId|cityName|hits|
+------+--------+----+
|     1|  London|  20|
|     2|New York|  30|
+------+--------+----+
only showing top 2 rows

How to print DataFrame schema on console

We can use .printSchema() to displays the schema of the DataFrame to stdout as shown below:
		/* Show complete data of dataframe*/
		dataFrame.show();
Output: Following output will be shown to stdout:
 |-- cityId: integer (nullable = false)
 |-- cityName: string (nullable = true)
 |-- hits: long (nullable = false)

How to filter values in Spark DataFrame

We can use .filter() to filter out the values of a DataFrame as shown below:
dataFrame.filter(col("cityId").eqNullSafe(3)).show();
Please dont forget to import, "import static org.apache.spark.sql.functions.col" in your class.

Output: Following output will be shown to stdout:
+------+--------+----+
|cityId|cityName|hits|
+------+--------+----+
|     3|   Delhi|  40|
|     3|   Delhi|  60|
+------+--------+----+

Running SQL Queries on DataFrames in Spark

In Spark Java API, SparkSession enables applications to run SQL queries programmatically and returns the result as a Dataset<Row>. In order to query a DataFrame need to be registered as a SQL temporary view:
		Dataset<Row> dataFrame = sparkSession.createDataFrame(dummy, Geo.class);
		dataFrame.show();
		
		dataFrame.createOrReplaceTempView("GEO_VIEW");

		Dataset<Row> queryResultDf = sparkSession.sql("SELECT * FROM GEO_VIEW where cityId = 1");
		queryResultDf.show();
Output: Following output will be shown to stdout:
+------+--------+----+
|cityId|cityName|hits|
+------+--------+----+
|     1|  London|  20|
|     2|New York|  30|
|     3|   Delhi|  40|
|     3|   Delhi|  60|
+------+--------+----+

+------+--------+----+
|cityId|cityName|hits|
+------+--------+----+
|     1|  London|  20|
+------+--------+----+
In this article we have seen, what are DataFrames in Spark, their properties, implementation and operations using Spark Java API.