Tecton is powered by Spark to make it easy to do complex transformations with your data and layer in real-time data from your Kafka and Kinesis streams. When extracting a training dataset, this is the only step that needs to be done within a Spark environment, like Databricks or AWS EMR.
import tecton
Let's take a look at what kind of data on our users we've got stored in Tecton
# Tecton allows you to have multiple workspaces to segregate production from testing workspaces. Here we'll use production.
ws = tecton.get_workspace('prod')
ws.get_feature_package('days_since_last_transaction').preview()
user_id | timestamp | days_since_last | |
---|---|---|---|
0 | C811470007 | 2021-01-24 | 38 |
1 | C1818605013 | 2021-01-24 | 26 |
2 | C845012068 | 2021-01-24 | 30 |
3 | C1151762682 | 2021-01-24 | 30 |
4 | C1262770201 | 2021-01-24 | 32 |
5 | C820284643 | 2021-01-24 | 26 |
6 | C2121633740 | 2021-01-24 | 32 |
7 | C827961951 | 2021-01-24 | 38 |
8 | C744373600 | 2021-01-24 | 26 |
9 | C360031322 | 2021-01-24 | 38 |
ws.get_feature_package('user_age_days').preview()
user_id | timestamp | age | |
---|---|---|---|
0 | C1305004711 | 2021-01-03 | 14070 |
1 | C1016856028 | 2021-01-03 | 7192 |
2 | C1215951090 | 2021-01-03 | 7767 |
3 | C32393008 | 2021-01-03 | 8207 |
4 | C1298380324 | 2021-01-03 | 14124 |
5 | C577245010 | 2021-01-03 | 14910 |
6 | C2028036437 | 2021-01-03 | 8665 |
7 | C1298557761 | 2021-01-03 | 10694 |
8 | C684230144 | 2021-01-03 | 14860 |
9 | C990684641 | 2021-01-03 | 13035 |
We're generating a prediction context dataframe, i.e. a dataframe from your data store that Tecton will join with data from the feature service we created. We need to
pass at a minimum a join key(s)--the field(s) Tecton will use to match with a row of features in its feature store--and
a timestamp when training, so Tecton knows at what time the label column (isfraud
in our case) was valid.
We can optionally pass additional columns (below those are the amount
, and type
s columns) that we expect to be passed at inference time to train and make predictions.
In this case, we're querying our transactions database for the following fields:
user_id
: our join keytimestamp
: the timestamp to use when retrieving features. (Features can and do change for a given user_id over time, so we want to make sure we get the right features that were valid at this timestamp)isfraud
: this is our label, which was applied to this row at timestamp
time.amount
, type_cash_in
, etc.: additional features which will be joined and returned to us by Tecton without modification.context_df = sqlContext.sql("""
SELECT
nameorig AS user_id,
isfraud,
timestamp,
amount,
type_cash_in,
type_cash_out,
type_debit,
type_payment,
type_transfer
FROM fraud.fraud_transactions_pq
SORT BY timestamp DESC
LIMIT 200000
""")
spine_df.cache()
Let's take a look at what our context dataframe looks like.
display(context_df.limit(10))
user_id | isfraud | timestamp | amount | type_cash_in | type_cash_out | type_debit | type_payment | type_transfer |
---|---|---|---|---|---|---|---|---|
C65073211 | 0 | 2020-12-30T09:33:48.558+0000 | 202021.06 | 0 | 1 | 0 | 0 | 0 |
C958758156 | 0 | 2020-12-30T09:33:47.904+0000 | 417376.96 | 0 | 1 | 0 | 0 | 0 |
C180934013 | 0 | 2020-12-30T09:33:46.955+0000 | 291681.7 | 0 | 0 | 0 | 0 | 1 |
C695741309 | 0 | 2020-12-30T09:33:46.888+0000 | 113677.15 | 1 | 0 | 0 | 0 | 0 |
C229919283 | 0 | 2020-12-30T09:33:44.119+0000 | 626.07 | 0 | 0 | 0 | 1 | 0 |
C1958621893 | 0 | 2020-12-30T09:33:43.730+0000 | 5119.67 | 0 | 0 | 0 | 1 | 0 |
C1380271183 | 0 | 2020-12-30T09:33:40.163+0000 | 332163.2 | 0 | 1 | 0 | 0 | 0 |
C1554049844 | 0 | 2020-12-30T09:33:36.674+0000 | 10721.23 | 0 | 0 | 0 | 1 | 0 |
C1423238142 | 0 | 2020-12-30T09:33:36.128+0000 | 104510.4 | 0 | 1 | 0 | 0 | 0 |
C640929948 | 0 | 2020-12-30T09:33:33.398+0000 | 21011.3 | 0 | 0 | 0 | 1 | 0 |
# Generate training data
fs = ws.get_feature_service("fraud_prediction_service")
training_data = fs.get_feature_dataframe(context_df, timestamp_key="timestamp").to_spark()
We've got a Spark dataframe and we can write it out to the destination of your choice, including flat files, database tables, etc. Here we write to a parquet file on S3: the below command assumes you've mounted an S3 bucket to /mnt/tecton
using dbutils.fs.mount()
or equivalent.
training_data.write.mode("overwrite").parquet(f"/mnt/tecton/fraud_dataset.parquet")
That's it! Check out training.py
to see how you can load this training dataset in Databricks or in the Python environment of your choice, including Jupyter notebooks running on your local machine or SageMaker.