Batch Prediction
The batch prediction job will be executed as a Spark Application running in a Spark cluster on top of Kubernetes.
Prediction Job
Prediction Job is the resource introduced in Merlin for executing batch prediction. A Prediction Job is owned by the corresponding Model Version. One Model Version can have several Prediction Jobs and it maintains the history of all jobs ever created. Prediction Job has several important properties:
Id: Unique ID of the prediction job
Model / Model version: Reference to the model version from which the prediction job is created
Config: config will contain the source, sink, secret configuration of the prediction job. It could also contain additional config for resource requests or spark-specific configuration
State: Current state of the prediction job (see lifecycle section)
Error: Detailed error message if the prediction job is unsuccessful
Logs: Link to the log location
Monitoring URL: Link to the monitoring dashboard
Lifecycle
Prediction Job has several state during its lifetime:
Pending: Prediction job is in this state once it is created / submitted. It will enter the running state if the spark application is started successfully, otherwise it will enter a failed state.
Running: Prediction jobs will move to the running state once the underlying spark application for executing the prediction job is created. The prediction job will be in this state until the spark application is completed (in which case it moves to completed state) or failed (in which case the prediction job entered failed state). Users can manually stop the prediction job and it will enter the terminating state.
Completed: Prediction job enter the completed state if it’s completed successfully
Failed: Any kind of failure preventing the prediction job not being able to complete will make it enters the failed state.
Terminating: Prediction jobs enter a terminating state if a user manually cancels a pending/running prediction job.
Terminated: Once the termination process is completed the prediction job will enter the terminated state.
Creating Secret/Service Account
To be able to run a Prediction Job you’ll need a service account and store the key inside the MLP Project using secret management API. The service account must have following authorization:
BigQuery Job User (
roles/bigquery.jobUser
) in the project where service account is createdBigQuery Read Session User (
roles/bigquery.readSessionUser
) in the project where service account is createdBigQuery Data Viewer (
roles/bigquery.dataViewer
) in the source datasetBigQuery Data Editor (
roles/bigquery.dataEditor
) in the destination datasetStorage Writer (
roles/storage.legacyBucketWriter
)Storage Object Admin (
roles/storage.objectAdmin
)
Configuring Source
You can specify the source configuration of your prediction job by creating an instance of BigQuerySource
. This class’s constructor accept following parameters:
table
: source table ID with format gcp_project.dataset_name.table_namefeatures
: list of features to be used for prediction, it has to match the column name in the source table.options
: is dictionary containing additional options that could be used to customize the source. Following are option that can be used.
parentProject
The Google Cloud Project ID of the table to bill for the export.(Optional. Defaults to the project of the Service Account being used)
maxParallelism
viewsEnabled
viewMaterializationProject
The project id where the materialized view is going to be created(Optional. Defaults to view's project id)
viewMaterializationDataset
The dataset where the materialized view is going to be created(Optional. Defaults to view's dataset)
readDataFormat
Data Format for reading from BigQuery. Options: ARROW
, AVRO
. Unsupported Arrow filters are not pushed down and results are filtered later by Spark. (Currently Arrow does not suport disjunction across columns).(Optional. Defaults to AVRO
)
optimizedEmptyProjection
The connector uses an optimized empty projection (select without any columns) logic, used for count() execution. This logic takes the data directly from the table metadata or performs a much efficient SELECT COUNT(*) WHERE...
in case there is a filter. You can cancel the use of this logic by setting this option to false
. (Optional, defaults to true
)
Source: https://github.com/GoogleCloudDataproc/spark-bigquery-connector
Reading from View
To use view as data source instead of table you’ll have to set viewsEnabled to true and specify viewMaterializationProject
and viewMaterializationDataset
. Since the materialization of view will create a table, the service account should also have roles/bigquery.dataEditor
in the pointed dataset. Below is an example:
Configuring Sink
To configure the destination of prediction job you can create an instance of BigQuerySink
. The class accepts following parameters:
table
: destination table ID with format gcp_project.dataset_name.table_namestaging_bucket
: GCS staging bucket that will be used as temporary storage for storing prediction result before loading it to the destination table.result_column
: Column name in the destination table that will be used to store the prediction result. Note that it has to be a string and not list of string even though if you specify ARRAY as the result.save_mode
: SaveMode is used to specify the expected behavior of saving the prediction result into destination table. Following are the possible values:ERRORIFEXISTS
: it will throw error if the destination table already exists (default).OVERWRITE
: it will overwrite the destination table if it exists.APPEND
: it will append the new result into destination table if it exists.IGNORE
: it will not write the prediction result if the destination table exists.
options
: Dictionary of strings that can be used to specify additional configuration. Following are the available parameters.
createDisposition
Specifies whether the job is allowed to create new tables. The permitted values are:
CREATE_IF_NEEDED
- Configures the job to create the table if it does not exist.CREATE_NEVER
- Configures the job to fail if the table does not exist.
This option takes place only in case Spark has decided to write data to the table based on the SaveMode. (Optional. Default to CREATE_IF_NEEDED
).
intermediateFormat
The format of the data before it is loaded to BigQuery, values can be either "parquet" or "orc". (Optional. Defaults to parquet
). On write only.
partitionField
If not set, the table is partitioned by pseudo column, referenced via either '_PARTITIONTIME' as TIMESTAMP
type, or '_PARTITIONDATE' as DATE
type. If field is specified, the table is instead partitioned by this field. The field must be a top-level TIMESTAMP or DATE field. Its mode must be NULLABLE or REQUIRED. (Optional).
partitionExpirationMs
Number of milliseconds for which to keep the storage for partitions in the table. The storage in a partition will have an expiration time of its partition time plus this value. (Optional).
partitionType
The only type supported is DAY
, which will generate one partition per day. (Optional. Default to DAY
).
clusteredFields
Comma separated list of non-repeated, top level columns. Clustering is only supported for partitioned tables (Optional).
allowFieldAddition
allowFieldRelaxation
Adds the ALLOW_FIELD_RELAXATION SchemaUpdateOption to the BigQuery LoadJob. Allowed vales are true
and false
. (Optional. Default to false
).
Source: https://github.com/GoogleCloudDataproc/spark-bigquery-connector
Configuring Resource Request
Class PredictionJobResourceRequest
is useful to configure the resource request for running prediction job. It contains several configurable parameters of the underlying Spark application. Broadly, configurations for the following Spark components are exposed:
Driver - The driver is responsible for orchestration of the computation. It is the central coordinator that manages the execution of the Spark application.
Executor - The executors execute the tasks assigned to them by the driver. These are the worker nodes responsible for performing the actual computations.
The executor resources play a crucial role in the performance of the prediction jobs. The driver resources, on the other hand, are more relevant to driver-intensive tasks (when a lot of aggregations are involved) and are thus less important for the prediction jobs. For both the driver and the executor, increasing the CPU allocation can result in better parallelism and faster computations. Similarly, increasing the memory allocation allows for larger amounts of data to be retained in memory (as opposed to saving chunks onto the disk) which in turn improves the processing time.
The below configurations of the driver and executor may be modified.
driver_cpu_request
: Driver CPU request. e.g: 1, 1500m , 500m.driver_memory_request
: Driver memory request. e.g. 1Gi, 512Miexecutor_cpu_request
: executor CPU request. e.g: 1, 1500m , 500mexecutor_memory_request
: executor memory request. e.g. 1Gi, 512Miexecutor_replica
: number of executor replica. e.g. 1, 2
The same configurations may also be applied from the UI when submitting a batch job:
Without specifying any resources, the prediction job will run with the system default as follows:
This default configuration is good enough for most cases. However, it might not be sufficient for cases where you have large model size, the dataset has a wide table (a lot of column), or the processing requires a lot of memory. In such cases, you might want to increase the executor_memory_request
to a larger value.
You might also want to make the prediction job to complete faster by increasing the executor_cpu_request
and executor_replica
. However, it will increase the cost significantly.
Note: When optimizing the resource configurations, the best values can be determined by observing the resource usage of the components in the monitoring dashboard. However, we must also account for spikes instead of simply relying on the average value over a time window.
Known Issues
Type Conversion Error When BQ Source Has Date Column
Symptom
Following error is thrown during batch prediction execution
Check whether your BQ source table has DATE type column. If so the workaround might help.
Root Cause
https://issues.apache.org/jira/browse/SPARK-30961
Work Around
Add pyarrow==0.11.1
and pandas==0.24.1
to conda environment.yaml
of your model.
Last updated