Resolving the Uncleaned Data issue for the YouTube Analysis DE Project

Deepak Dewani
7 min readApr 24, 2024

If you’ve arrived at this article, it likely means you’re currently engaged in an end-to-end data engineering project utilizing Python, AWS, and PySpark for YouTube analysis. You might be encountering an issue related to uncleaned data, resulting in an error message such as “HIVE_CURSOR_ERROR: Row is not a valid JSON Object — JSONException: A JSONObject text must end with ‘}’ at 2 [character 3 line 1].”

Architecture for the Process of cleaning the data
Architecture for the Process of cleaning the data

This error typically surfaces after generating metadata and attempting to execute queries in AWS Athena. It stems from Athena’s expectation of data being in a format where keys and values are on a single line. However, if your data is structured differently, such as dictionaries spanning multiple lines, it can trigger errors like the one described.

If you’re new to this article, I recommend reviewing the End-to-End Data Engineering Project using Python, AWS, and PySpark for YouTube analysis beforehand, as it provides context for the issues discussed here.

To resolve this issue, you need to clean the data by restructuring it so that each key-value pair is on a single line. We will be changing the JSON data to Paraquet format. Once the data is cleaned and formatted appropriately, you can run queries on it in Athena without encountering errors. This process ensures smooth execution of queries and accurate analysis of your data.

Adding Output location for AWS Athena

  1. Go to AWS Athena, and search for “Setting”, you will see below screenshot.
Athena Setting Page
Athena Setting Page

2. Add the S3 bucket path (create new S3 Bucket to save all you query related result) in Query result location.

Writing the ETL Code to convert JSON to Paraquet format using AWS Lambda

To begin, navigate to the AWS Management Console and search for AWS Lambda. Once located, open the Lambda service. On the Lambda page, click on “Create function.” Choose “Author from scratch” and assign a unique name to your function. Select your preferred runtime language, such as Python 3.8, and keep the architecture as x86_64.

Next, click on “Change default execution role” to configure permissions. You’ll need to create a role in IAM that allows Lambda to communicate with various AWS services. Head to IAM, search for “Role,” and click on “Create role.” Select “Lambda” as the service that will use this role. Attach the “S3FullAccessRole” and “GlueServiceRole” permission policies to the role, then save the role.

Return to the Lambda page, select the newly created role, review the settings, and save the changes. This configuration ensures that your Lambda function has the necessary permissions to interact with AWS services like S3 and Glue.

In the Lambda function, you’ll need to write code to process the uncleaned data into a cleaned format. Below is a sample code snippet to achieve this:

import awswrangler as wr
import pandas as pd
import urllib.parse
import os

os_input_s3_cleansed_layer = os.environ['s3_cleansed_layer']
os_input_glue_catalog_db_name = os.environ['glue_catalog_db_name']
os_input_glue_catalog_table_name = os.environ['glue_catalog_table_name']
os_input_write_data_operation = os.environ['write_data_operation']


def lambda_handler(event, context):
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:

# Creating DF from content
df_raw = wr.s3.read_json('s3://{}/{}'.format(bucket, key))

# Extract required columns:
df_step_1 = pd.json_normalize(df_raw['items'])

# Write to S3
wr_response = wr.s3.to_parquet(
df=df_step_1,
path=os_input_s3_cleansed_layer,
dataset=True,
database=os_input_glue_catalog_db_name,
table=os_input_glue_catalog_table_name,
mode=os_input_write_data_operation
)

return wr_response
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e

To ensure the smooth functioning of our code, we need to set up environment variables in the Lambda function configuration. Follow these steps:

  1. Navigate to the Lambda function configuration and locate the “Environment variables” section.
  2. Click on “Edit” to modify the environment variables.
  3. Add the following environment variables:
  • glue_catalog_db_name: Set this to the name of the database in AWS Glue Data Catalog. It's recommended to create a new database in Athena with the same name.
  • glue_catalog_table_name: Assign this variable the name of the table in the Glue Data Catalog where cleaned data will be stored. For example, "cleaned_statistics_reference_data".
  • s3_cleansed_layer: Specify the S3 path where cleaned data will be stored after execution. It's recommended to create a new bucket in S3 for this purpose.
  • write_data_operation: Set this variable to define the operation to perform when writing data. For example, "append".

Ensure that the values assigned to these environment variables align with your specific setup and requirements. This setup enables the Lambda function to utilize these variables during execution.

To test our code, we first need to deploy it. Follow these steps:

  1. Navigate to the Lambda function configuration and click on the “Code” section.
  2. Click on the “Deploy” button to deploy the latest code changes.
  3. Next, click on the drop-down menu near “TEST” and select “Configure test events.”
  4. In the Test event configuration, select “Create a new test event.”
  5. Give the event a name, such as “S3 PUT.”
  6. Update the event JSON with your bucket name in place of “example-bucket” under the “S3” dictionary for the key “bucket.”
  7. Also, update the file path of a JSON data file under the “object” dictionary for the key “Key,” for example, “youtube/raw_statistics_reference_data/US_category_id.json.”

For reference of code:

{
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "us-east-1",
"eventTime": "1970-01-01T00:00:00.000Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"responseElements": {
"x-amz-request-id": "EXAMPLE123456789",
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "testConfigRule",
"bucket": {
"name": "youtube-project-bucket(your bucket name)",
"ownerIdentity": {
"principalId": "EXAMPLE"
},
"arn": "arn:aws:s3:::youtube-project-bucket(your bucket name)"
},
"object": {
"key": "youtube/raw_statistics_reference_data/US_category_id.json",
"size": 1024,
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901"
}
}
}
]
}

Now, click on the “Test” button to actually test the function, that it is working or not. After testing, you will encounter 2 errors, that we will solve all of them one by one.

Error Resolution

First Error

{ “errorMessage”: “Unable to import module ‘lambda_function’: No module named ‘awswrangler’”, “errorType”: “Runtime.ImportModuleError”, “stackTrace”: [] }

AWS Wrangler Error
AWS Wrangler Error

To resolve the error, we need to install the AWS wrangler module within the AWS Lambda function. Follow these steps:

  1. In the Lambda function configuration, scroll down to the bottom of the page.
  2. Search for the “Add Layer” section and click on it.
  3. From the drop-down menu of AWS Layers, search for “AWSSDKPandas-Python38.”
  4. Select the Layer version “18” or the latest available version.
  5. Click on “ADD” to add the layer to your Lambda function.

This will install the AWS wrangler module within the Lambda function, allowing it to utilize the necessary functionality to process data effectively.

Second Error

Timeout Error
Timeout Error

To resolve the error, we should increase the timeout duration for the Lambda function. Follow these steps:

  1. Go to the Lambda function configuration.
  2. Under “Configuration,” locate the “General configuration” section and click on “Edit.”
  3. Increase the “Memory” from 128 MB to 512 MB to ensure optimal performance.
  4. Next, adjust the “Timeout” from “3 sec” to “10 min 3 Sec” to provide sufficient time for the function to execute.
  5. Click on “Save” to apply the changes.

By increasing the memory allocation and timeout duration, we can address potential performance issues and ensure that the Lambda function completes its execution without encountering timeouts. By implementing the changes outlined above, the Lambda function code should execute successfully, and you should receive a response similar to the following:

{
"paths": [
"s3://yourbucket/youtube/0683331c1d33466e940cf51c4d535a77c.snappy.parquet"
],
"partitions_values": {}
}

Indeed, the response indicates the successful completion of the Lambda function execution. It also provides information about the paths where the processed data is stored in Amazon S3. Typically, the cleaned data will be stored in the Parquet format within your designated S3 bucket. Navigate to your new S3 bucket where cleaned data is storing, you will be seeing the below

Cleaned Data in Parquet Format
Cleaned Data in Parquet Format

Parquet is a columnar storage format optimized for analytics workloads. Storing data in Parquet format in S3 enhances query performance and reduces storage costs, making it a popular choice for data analytics and processing pipelines in AWS environments.

Ensure to check your cleaned S3 bucket to confirm that the data has been stored in the expected Parquet format.

After confirming that the data has been stored in the expected Parquet format in your S3 bucket, you can proceed to check the Glue Data Catalog. Under “Database/tables,” you should find the table named “cleaned_statistics_reference_data.” This table will contain the cleaned metadata, including its columns and data types.

You can click on “View data” to inspect the data within Glue. Alternatively, navigate to Athena and run queries to view the structured version of your cleaned data. Athena provides a convenient interface for querying data stored in S3 using SQL, allowing you to analyze and explore your cleaned dataset effectively.

Structured Version of Data
Structured Version of Data

Resolving the error caused by uncleaned data and being able to query your dataset is a significant milestone. Now that you’ve successfully addressed this issue, you can confidently proceed with the rest of your end-to-end data engineering project using Python, AWS, and PySpark for YouTube analysis. If you encounter any more challenges or need further assistance along the way, feel free to reach out. Good luck with your project, and happy analyzing!

--

--

Deepak Dewani

🚀 Passionate Data Engineer | Turning Raw Data into Actionable Insights. Constant learner and knowledge-sharer. Expert in ETL processes, and Python enthusiast.