## ----setup, include = FALSE---------------------------------------------------
knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>"
)

## -----------------------------------------------------------------------------
library(DBI)
library(RSQLite)
library(featdelta)

con <- dbConnect(SQLite(), ":memory:")

readings <- data.frame(
  reading_id = 1:10,
  device_id = c("A", "A", "B", "C", "A", "B", "C", "A", "B", "C"),
  temperature = c(68, 71, 75, 70, 83, 78, 72, 88, 81, 74),
  vibration = c(0.12, 0.15, 0.31, 0.18, 0.44, 0.39, 0.22, 0.55, 0.41, 0.24),
  pressure = c(31, 33, 36, 32, 39, 37, 34, 42, 38, 35),
  runtime_hours = c(120, 125, 210, 88, 130, 216, 93, 136, 223, 97),
  stringsAsFactors = FALSE
)

day_one <- 1:6
day_two <- 7:10

dbWriteTable(
  con,
  "raw_readings",
  readings[day_one, ],
  overwrite = TRUE
)

source_sql <- "
  SELECT
    reading_id,
    device_id,
    temperature,
    vibration,
    pressure,
    runtime_hours
  FROM raw_readings
  ORDER BY reading_id
"

key <- "reading_id"

dbGetQuery(con, source_sql)

## -----------------------------------------------------------------------------
defs <- fd_define(
  temp_above_80 = temperature > 80,
  vibration_score = vibration * runtime_hours,
  pressure_per_hour = pressure / runtime_hours,
  maintenance_flag = temp_above_80 | vibration_score > 80
)

raw_preview <- dbGetQuery(con, source_sql)

features_preview <- fd_compute(
  data = raw_preview,
  defs = defs,
  key = key
)

features_preview

## -----------------------------------------------------------------------------
run_initial <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "reading_features",
  verbose = FALSE
)

dbGetQuery(con, "SELECT * FROM reading_features ORDER BY reading_id")

## -----------------------------------------------------------------------------
dev_report <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "reading_features_dev",
  fetch_limit = 3,
  return_data = "both",
  preview_n = 2,
  verbose = FALSE
)

dev_report$preview$raw
dev_report$preview$features
dev_report$data$features

## -----------------------------------------------------------------------------
run_initial$success
run_initial$stage
run_initial$fetch$n_rows
run_initial$compute$feature_names
run_initial$upsert$counts

## -----------------------------------------------------------------------------
bad_defs <- fd_define(
  temp_above_80 = temperature > 80,
  broken_feature = missing_sensor_column / 10
)

failed_report <- fd_run(
  con = con,
  sql = source_sql,
  defs = bad_defs,
  key = key,
  feat_table_name = "reading_features_broken",
  fail_fast = FALSE,
  return_data = "both",
  verbose = FALSE
)

failed_report$success
failed_report$stage
failed_report$error
failed_report$compute$report

## ----eval = FALSE-------------------------------------------------------------
# fd_run(
#   con = con,
#   sql = source_sql,
#   defs = bad_defs,
#   key = key,
#   feat_table_name = "reading_features",
#   fail_fast = TRUE
# )

## -----------------------------------------------------------------------------
dbAppendTable(con, "raw_readings", readings[day_two, ])

run_incremental <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "reading_features",
  verbose = FALSE
)

# The default fetch mode is "new_only".
run_incremental$fetch$mode

# Only the four new day-two readings were fetched.
run_incremental$fetch$n_rows

# Four rows were inserted, and no existing feature rows were updated.
run_incremental$upsert$counts

## -----------------------------------------------------------------------------
defs_v2 <- fd_define(
  temp_above_80 = temperature > 80,
  vibration_score = vibration * runtime_hours,
  pressure_per_hour = pressure / runtime_hours,
  maintenance_flag = temp_above_80 | vibration_score > 80,
  high_pressure = pressure >= 38
)

run_refresh <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs_v2,
  key = key,
  feat_table_name = "reading_features",
  fetch_mode = "all",
  verbose = FALSE
)

# Refresh mode fetches all rows returned by the source query.
run_refresh$fetch$mode
run_refresh$fetch$n_rows

# All rows already existed in the feature table, so they were updated.
# No new rows were inserted.
run_refresh$upsert$counts

## -----------------------------------------------------------------------------
defs_v3 <- fd_define(
  temp_above_80 = temperature > 80,
  vibration_score = vibration * runtime_hours,
  maintenance_flag = temp_above_80 | vibration_score > 80,
  high_pressure = pressure >= 38
)

run_removed_column <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs_v3,
  key = key,
  feat_table_name = "reading_features",
  fetch_mode = "all",
  verbose = FALSE
)

# All ten source rows already existed in the feature table, so they were
# counted as updates rather than inserts.
run_removed_column$upsert$counts

# The database table still contains the old `pressure_per_hour` column.
dbGetQuery(con, "pragma table_info(reading_features)")

## -----------------------------------------------------------------------------
defs_v4 <- fd_define(
  temp_above_80 = temperature > 80,
  vibration_score = vibration * runtime_hours,
  maintenance_flag = temp_above_80 | vibration_score > 80,
  high_pressure = pressure >= 38,
  thermal_load = temperature * runtime_hours
)

alter_error <- tryCatch(
  fd_run(
    con = con,
    sql = source_sql,
    defs = defs_v4,
    key = key,
    feat_table_name = "reading_features",
    fetch_mode = "all",
    alter_table = FALSE,
    verbose = FALSE
  ),
  error = function(e) conditionMessage(e)
)

alter_error

## -----------------------------------------------------------------------------
insert_only_error <- tryCatch(
  fd_run(
    con = con,
    sql = source_sql,
    defs = defs_v3,
    key = key,
    feat_table_name = "reading_features",
    fetch_mode = "all",
    update_table = FALSE,
    verbose = FALSE
  ),
  error = function(e) conditionMessage(e)
)

insert_only_error

## ----eval = FALSE-------------------------------------------------------------
# fd_run(
#   con = con,
#   sql = source_sql,
#   defs = defs,
#   key = key,
#   feat_table_name = "reading_features",
#   chunk_size = 10000
# )

## ----cleanup, include = FALSE-------------------------------------------------
dbDisconnect(con)

