
本文旨在解决在Snowflake中使用保存的Encoder进行新数据推理时遇到的“ufunc ‘isnan’ not supported”错误。通过详细的代码示例和解释,本文将指导你如何在Snowflake中正确地存储和加载Encoder,并使用它们进行数据转换,最终实现模型推理。同时,还将介绍如何优化UDF的性能,使用向量化特性和缓存机制来提高推理速度。
在使用Snowflake进行机器学习模型部署时,一个常见的挑战是如何在用户自定义函数(UDF)中加载和使用预先训练好的Encoder和模型。本文将深入探讨如何解决在Snowflake中保存的Encoder(例如OneHotEncoder和OrdinalEncoder)在用于新数据推理时出现的“ufunc ‘isnan’ not supported”错误。我们将提供详细的步骤和代码示例,以确保你能够成功地在Snowflake环境中进行模型推理。
问题分析
该问题的核心在于,当从Snowflake加载Encoder并尝试使用它们转换新的数据时,由于数据类型或数据格式不匹配,导致numpy的isnan函数无法处理输入数据。这通常发生在以下情况下:
数据类型不一致: 训练数据和推理数据的数据类型不一致,例如,训练时是数值型,推理时变成了字符串型。缺失值处理不当: Encoder在训练时没有正确处理缺失值,导致在推理时遇到未知的缺失值。API调用错误: 混淆了Snowpark API和scikit-learn API,导致数据转换失败。
解决方案
以下步骤将指导你如何正确地在Snowflake中存储、加载和使用Encoder进行模型推理。
1. 数据准备和Encoder训练
首先,我们需要准备数据并训练Encoder。以下代码演示了如何使用Snowpark和scikit-learn的OneHotEncoder和OrdinalEncoder。
无涯·问知
无涯·问知,是一款基于星环大模型底座,结合个人知识库、企业知识库、法律法规、财经等多种知识源的企业级垂直领域问答产品
40 查看详情
from snowflake.snowpark.session import Sessionimport snowflake.snowpark.functions as Fimport numpy as npfrom snowflake.ml.modeling.preprocessing import OneHotEncoder,OrdinalEncoderfrom snowflake.ml.modeling.xgboost import XGBRegressorimport json# 创建Snowflake会话connection_parameters = json.load(open('connection.json'))session = Session.builder.configs(connection_parameters).create()# 创建模拟数据mock_df = session.create_dataframe( [[979152,"A","XX","SUN","2023-11-24 08:30:00","2023-11-24 12:25:00",189,0.62], [987073,"A","ZZ","SUN","2023-12-13 16:15:00","2023-12-13 11:25:00",189,0.75], [951384,"C","YY","FAR_SUN","2023-12-05 09:40:00","2023-12-05 13:35:00",189,0.88], [952380,"B","WW","FAR_SUN","2023-11-22 19:45:00","2023-11-22 14:30:00",235,0.86], [963602,"B","ZZ","FAR_SUN","2023-12-29 10:30:00","2023-12-29 15:05:00",235,0.66]], schema=[ "ID","AIRPORT","A_AIRPORT", "CATEGORY","D_DATETIME","A_DATETIME","CAPACITY","TARGET" ])mock_df = mock_df.select_expr("*","TO_TIMESTAMP(D_DATETIME) AS D_DATETIME_T","TO_TIMESTAMP(A_DATETIME) AS A_DATETIME_T")# 训练OneHotEncoderohe = OneHotEncoder(handle_unknown='ignore',input_cols='CATEGORY',output_cols='ROUTE_OHE')ohe.fit(mock_df)# 训练OrdinalEncodercategories = { "AIRPORT": np.array(['A', 'B', 'C'])}oe = OrdinalEncoder( handle_unknown='use_encoded_value',unknown_value=-1, encoded_missing_value=-1,input_cols='AIRPORT', output_cols='AIRPORT_ENCODE', categories=categories)oe.fit(mock_df)# 训练XGBoost模型xgb = XGBRegressor( n_estimators = 100, max_depth = 3, input_cols=[ "AIRPORT_ENCODE","ROUTE_OHE_FAR_SUN","ROUTE_OHE_SUN", "CAPACITY" ], label_cols="TARGET",output_cols="xgb_prediction")xgb.fit(mock_df)
2. 保存Encoder和模型到Snowflake
接下来,我们将Encoder和模型保存到Snowflake的stage中。使用joblib库将对象序列化,然后使用Snowflake的PUT命令将文件上传到stage。
from joblib import dumpdef save_object(object_,filename,stagename,auto_compress=True): dump(object_, filename) session.file.put(filename, stagename, overwrite=True,auto_compress=auto_compress) return# 提取模型对象xgb_model = xgb.to_xgboost()ohe_obj = ohe.to_sklearn()oe_obj = oe.to_sklearn()# 保存对象到stagesave_object(xgb_model,'xgb_model.joblib','@AM_TEST_MODELS')save_object(ohe_obj,'one_hot_encode.joblib','@AM_TEST_MODELS',auto_compress=False)save_object(oe_obj,'ordinal_encode.joblib','@AM_TEST_MODELS',auto_compress=False)
3. 创建UDF进行推理
创建一个UDF,用于加载Encoder和模型,并对新数据进行推理。以下代码演示了如何使用joblib加载Encoder和模型,并使用它们进行数据转换。
session.add_import("@AM_TEST_MODELS/xgb_model.joblib.gz")session.add_import("@AM_TEST_MODELS/one_hot_encode.joblib")session.add_import("@AM_TEST_MODELS/ordinal_encode.joblib")session.add_packages("pandas==1.5.3","joblib==1.2.0","xgboost==1.7.3","scikit-learn==1.2.2")import cachetools@cachetools.cached(cache={})def read_file(filename): import sys import os import joblib # Get the "path" of where files added through iport are avalible import_dir = sys._xoptions.get("snowflake_import_directory") if import_dir: with open(os.path.join(import_dir, filename), 'rb') as file: m = joblib.load(file) return mfrom snowflake.snowpark.types import PandasDataFrameType,PandasSeriesType,IntegerType,StringType,FloatType,PandasDataFrame,PandasSeriesimport pandas as pd@F.udf( name='predict_target',session=session,replace=True, is_permanent=True,stage_location='@AM_TEST_UDFS', input_types=[PandasDataFrameType([ IntegerType(), StringType(), StringType(), StringType(), StringType(), StringType(), IntegerType() ])], return_type=PandasSeriesType(FloatType()))def predict_target( df: pd.DataFrame) -> pd.Series: import sys import pandas as pd from joblib import load import sklearn import xgboost as xgb IMPORT_DIRECTORY_NAME = "snowflake_import_directory" import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME] # 加载Encoder和模型 ohe = read_file('one_hot_encode.joblib') oe = read_file('ordinal_encode.joblib') model = read_file('xgb_model.joblib.gz') features = [ "ID","AIRPORT","A_AIRPORT", "CATEGORY","D_DATETIME","A_DATETIME","CAPACITY" ] df.columns = features # 使用Encoder转换数据 df_ohe = ohe.transform(df[['CATEGORY']]) encoded_df = pd.DataFrame(df_ohe, columns=ohe.categories_) encoded_df.columns = encoded_df.columns.get_level_values(0) encoded_df = encoded_df.add_prefix('ROUTE_NAME_OHE_') df = pd.concat([df, encoded_df], axis=1) df['AIRPORT_ENCODE'] = oe.transform(df[['AIRPORT']]) # 转换日期格式 df.loc[:,'D_DATETIME'] = pd.to_datetime( df.loc[:,'D_DATETIME'],format='%Y-%m-%d %H:%M:%S',yearfirst=True ) df['A_DATETIME'] = pd.to_datetime( df['A_DATETIME'],format='%Y-%m-%d %H:%M:%S',yearfirst=True ) # 创建特征 df['depart_hour'] = df['D_DATETIME'].dt.hour df['depart_weekday'] = df['D_DATETIME'].dt.day_of_week + 1 df['depart_monthday'] = df['D_DATETIME'].dt.day df['depart_yearday'] = df['D_DATETIME'].dt.day_of_year df['depart_month'] = df['D_DATETIME'].dt.month df['depart_year'] = df['D_DATETIME'].dt.year df['arrive_hour'] = df['A_DATETIME'].dt.hour # 进行推理 pm = pd.Series(model.predict(df[[ "AIRPORT_ENCODE","ROUTE_NAME_OHE_FAR_SUN","ROUTE_NAME_OHE_SUN", "CAPACITY","depart_hour", "depart_weekday","depart_monthday","depart_yearday", "depart_month","depart_year","arrive_hour" ]])) return pm
4. 使用UDF进行推理
最后,使用创建的UDF对新数据进行推理。
from snowflake.snowpark.functions import col# 创建推理数据inference_df = session.create_dataframe( [[979152,"C","ZZ","SUN","2023-11-01 16:30:00","2023-11-01 20:25:00",189], [987073,"C","ZZ","SUN","2023-12-18 19:15:00","2023-12-18 22:25:00",189], [951384,"A","YY","FAR_SUN","2023-12-06 15:40:00","2023-12-06 17:35:00",189], [952380,"A","WW","FAR_SUN","2023-11-22 10:45:00","2023-11-22 14:30:00",235], [963602,"B","WW","FAR_SUN","2023-11-30 13:30:00","2023-12-29 15:05:00",235]], schema=[ "ID","AIRPORT","A_AIRPORT", "CATEGORY","D_DATETIME","A_DATETIME","CAPACITY" ])# 调用UDF进行推理inference_df.withColumn( 'PREDICTED_TARGET', predict_target(inference_df)).show()
注意事项
数据类型一致性: 确保训练数据和推理数据的数据类型一致。缺失值处理: 在训练Encoder时,正确处理缺失值,例如使用handle_unknown=’ignore’。API调用: 在UDF中使用scikit-learn API进行数据转换,而不是Snowpark API。列名大小写: Snowflake在存储表时会将列名转换为大写,确保在UDF中正确引用列名。向量化: 尽量使用向量化的UDF,以提高推理性能。缓存: 使用@cachetools.cached装饰器缓存加载的Encoder和模型,以避免重复加载。包依赖: 确保在Snowflake中添加了所有需要的Python包,例如pandas、joblib、xgboost和scikit-learn。
总结
通过本文,你学习了如何在Snowflake中正确地存储、加载和使用Encoder进行模型推理。遵循这些步骤和注意事项,可以避免“ufunc ‘isnan’ not supported”错误,并成功地在Snowflake环境中部署机器学习模型。同时,使用向量化特性和缓存机制可以显著提高UDF的性能,从而实现更快的推理速度。
以上就是解决Snowflake中保存的Encoder无法用于推理新数据的问题的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/576247.html
微信扫一扫
支付宝扫一扫