PHPFixing
  • Privacy Policy
  • TOS
  • Ask Question
  • Contact Us
  • Home
  • PHP
  • Programming
  • SQL Injection
  • Web3.0

Monday, August 15, 2022

[FIXED] How do I get a dataframe or database write from TFX BulkInferrer?

 August 15, 2022     database, output, tensorflow, tfx     No comments   

Issue

I'm very new to TFX, but have an apparently-working ML Pipeline which is to be used via BulkInferrer. That seems to produce output exclusively in Protobuf format, but since I'm running bulk inference I want to pipe the results to a database instead. (DB output seems like it should be the default for bulk inference, since both Bulk Inference & DB access take advantage of parallelization... but Protobuf is a per-record, serialized format.)

I assume I could use something like Parquet-Avro-Protobuf to do the conversion (though that's in Java and the rest of the pipeline's in Python), or I could write something myself to consume all the protobuf messages one-by-one, convert them into JSON, deserialize the JSON into a list of dicts, and load the dict into a Pandas DataFrame, or store it as a bunch of key-value pairs which I treat like a single-use DB... but that sounds like a lot of work and pain involving parallelization and optimization for a very common use case. The top-level Protobuf message definition is Tensorflow's PredictionLog.

This must be a common use case, because TensorFlowModelAnalytics functions like this one consume Pandas DataFrames. I'd rather be able to write directly to a DB (preferably Google BigQuery), or a Parquet file (since Parquet / Spark seems to parallelize better than Pandas), and again, those seem like they should be common use cases, but I haven't found any examples. Maybe I'm using the wrong search terms?

I also looked at the PredictExtractor, since "extracting predictions" sounds close to what I want... but the official documentation appears silent on how that class is supposed to be used. I thought TFTransformOutput sounded like a promising verb, but instead it's a noun.

I'm clearly missing something fundamental here. Is there a reason no one wants to store BulkInferrer results in a database? Is there a configuration option that allows me to write the results to a DB? Maybe I want to add a ParquetIO or BigQueryIO instance to the TFX pipeline? (TFX docs say it uses Beam "under the hood" but that doesn't say much about how I should use them together.) But the syntax in those documents looks sufficiently different from my TFX code that I'm not sure if they're compatible?

Help?


Solution

(Copied from the related issue for greater visibility)

After some digging, here is an alternative approach, which assumes no knowledge of the feature_spec before-hand. Do the following:

  • Set the BulkInferrer to write to output_examples rather than inference_result by adding a output_example_spec to the component construction.
  • Add a StatisticsGen and a SchemaGen component in the main pipeline right after the BulkInferrer to generate a schema for the aforementioned output_examples
  • Use the artifacts from SchemaGen and BulkInferrer to read the TFRecords and do whatever is neccessary.
bulk_inferrer = BulkInferrer(
     ....
     output_example_spec=bulk_inferrer_pb2.OutputExampleSpec(
         output_columns_spec=[bulk_inferrer_pb2.OutputColumnsSpec(
             predict_output=bulk_inferrer_pb2.PredictOutput(
                 output_columns=[bulk_inferrer_pb2.PredictOutputCol(
                     output_key='original_label_name',
                     output_column='output_label_column_name', )]))]
     ))

 statistics = StatisticsGen(
     examples=bulk_inferrer.outputs.output_examples
 )

 schema = SchemaGen(
     statistics=statistics.outputs.output,
 )

After that, one can do the following:

import tensorflow as tf
from tfx.utils import io_utils
from tensorflow_transform.tf_metadata import schema_utils

# read schema from SchemaGen
schema_path = '/path/to/schemagen/schema.pbtxt'
schema_proto = io_utils.SchemaReader().read(schema_path)
spec = schema_utils.schema_as_feature_spec(schema_proto).feature_spec

# read inferred results
data_files = ['/path/to/bulkinferrer/output_examples/examples/examples-00000-of-00001.gz']
dataset = tf.data.TFRecordDataset(data_files, compression_type='GZIP')

# parse dataset with spec
def parse(raw_record):
    return tf.io.parse_example(raw_record, spec)

dataset = dataset.map(parse)

At this point, the dataset is like any other parsed dataset, so its trivial to write a CSV, or to a BigQuery table or whatever from there. It certainly helped us in ZenML with our BatchInferencePipeline.



Answered By - Hamza Tahir
Answer Checked By - Marilyn (PHPFixing Volunteer)
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg
Newer Post Older Post Home

0 Comments:

Post a Comment

Note: Only a member of this blog may post a comment.

Total Pageviews

Featured Post

Why Learn PHP Programming

Why Learn PHP Programming A widely-used open source scripting language PHP is one of the most popular programming languages in the world. It...

Subscribe To

Posts
Atom
Posts
Comments
Atom
Comments

Copyright © PHPFixing