Skip to main content

Documentation Index

Fetch the complete documentation index at: https://www.truefoundry.com/llms.txt

Use this file to discover all available pages before exploring further.

Snowflake tasks run SQL against Snowflake from your workflow. Authentication uses Snowflake key-pair authentication. For background on generating keys, assigning the public key to a Snowflake user, and rotation, see Snowflake’s Key-pair authentication and key-pair rotation guide. Install the workflow SDK with the Snowflake extra:
pip install -U "truefoundry[workflow,snowflake]"

Configure the Snowflake private key

The private key must be available in two places: task pods and the Flyte agent.

Task pods

Create a TrueFoundry secret with group name private-key and key snowflake, and store your Snowflake RSA private key (PEM) as that secret’s value. See Manage secrets for how to create secrets in TrueFoundry. Reference the secret in task mounts using its FQN, for example: tfy-secret://truefoundry:private-key:snowflake Mount it on any @task that uses Snowflake StructuredDataset URIs (see the example below).

Flyte agent

For the Flyte agent, create and use a TrueFoundry secret for the Snowflake private key, then wire that secret into the Flyte agent secret via kustomize. Set Flyte agent values:
flyteagent:
  image:
    repository: tfy.jfrog.io/tfy-images/tfy-workflow-agent
    tag: v0.82.0
  enabled: true
  agentSecret:
    secretData:
      secretData: {}
Then add/update the following in kustomize:
- kind: Secret
  type: Opaque
  metadata:
    name: flyteagent
  apiVersion: v1
  stringData:
    snowflake_private_key: tfy-secret://internal:private-key:snowflake

SnowflakeTaskConfig

user
str
required
Snowflake user name (the user whose RSA public key is registered in Snowflake).
account
str
required
Snowflake account identifier (for example the value in your account URL).
database
str
required
Default Snowflake database for the task.
schema_name
str
required
Default Snowflake schema for the task.
warehouse
str
required
Snowflake warehouse to use for the session.

SnowflakeTask

Define a Snowflake task with a SQL query template and task config.

Example

This workflow runs a select, prints the result, inserts a row, writes a small table via a Python task using a Snowflake StructuredDataset URI, and runs another select.
import pandas as pd

from flytekit import StructuredDataset, kwtypes
from truefoundry.deploy import SecretMount
from truefoundry.workflow import (
    PythonTaskConfig,
    TaskPythonBuild,
    task,
    workflow,
)
from truefoundry.workflow.snowflake_task import SnowflakeTask, SnowflakeTaskConfig

SNOWFLAKE_USER = "user1"
SNOWFLAKE_ACCOUNT = "ABCDEF-AAA111"
SNOWFLAKE_WAREHOUSE = "TEST_WAREHOUSE_1"
SNOWFLAKE_DATABASE = "DEMO_DB"
SNOWFLAKE_SCHEMA = "PUBLIC"

image = TaskPythonBuild(
    pip_packages=["pandas", "pyarrow", "truefoundry[workflow,snowflake]"],
)

insert_query = SnowflakeTask(
    name="insert-query",
    inputs=kwtypes(id=int, name=str, age=int),
    query_template="""
        INSERT INTO DEMO_DB.PUBLIC.test (ID, NAME, AGE)
        VALUES (%(id)s, %(name)s, %(age)s);
    """,
    task_config=SnowflakeTaskConfig(
        user=SNOWFLAKE_USER,
        account=SNOWFLAKE_ACCOUNT,
        database=SNOWFLAKE_DATABASE,
        schema_name=SNOWFLAKE_SCHEMA,
        warehouse=SNOWFLAKE_WAREHOUSE,
    ),
)

select_query = SnowflakeTask(
    name="select-query-basic",
    query_template=f"""
        SELECT * FROM {SNOWFLAKE_DATABASE}.{SNOWFLAKE_SCHEMA}.TEST;
    """,
    output_schema_type=StructuredDataset,
    task_config=SnowflakeTaskConfig(
        user=SNOWFLAKE_USER,
        account=SNOWFLAKE_ACCOUNT,
        database=SNOWFLAKE_DATABASE,
        schema_name=SNOWFLAKE_SCHEMA,
        warehouse=SNOWFLAKE_WAREHOUSE,
    ),
)


@task(
    task_config=PythonTaskConfig(
        image=image,
        mounts=[
            SecretMount(
                type="secret",
                mount_path="/etc/secrets/private-key/snowflake",
                secret_fqn="tfy-secret://truefoundry:private-key:snowflake",
            )
        ],
    ),
)
def print_head(input_sd: StructuredDataset) -> pd.DataFrame:
    df = input_sd.open(pd.DataFrame).all()
    print(df)
    return df


@task(
    task_config=PythonTaskConfig(
        image=image,
        mounts=[
            SecretMount(
                type="secret",
                mount_path="/etc/secrets/private-key/snowflake",
                secret_fqn="tfy-secret://truefoundry:private-key:snowflake",
            )
        ],
    ),
)
def write_table() -> StructuredDataset:
    df = pd.DataFrame(
        {"ID": [1, 2, 3], "NAME": ["flyte", "is", "amazing"], "AGE": [30, 30, 30]}
    )
    print(df)

    user = SNOWFLAKE_USER
    account = SNOWFLAKE_ACCOUNT
    database = SNOWFLAKE_DATABASE
    schema = SNOWFLAKE_SCHEMA
    warehouse = SNOWFLAKE_WAREHOUSE
    table = "TEST"
    uri = f"snowflake://{user}:{account}/{warehouse}/{database}/{schema}/{table}"

    return StructuredDataset(dataframe=df, uri=uri)


@workflow
def wf():
    select_task_1 = select_query()
    print_task = print_head(select_task_1)
    insert_query_task = insert_query(id=100, name="Flyte100", age=100)
    write_task = write_table()
    select_task_2 = select_query()

    select_task_1 >> print_task >> insert_query_task >> write_task >> select_task_2


if __name__ == "__main__":
    wf()

Checklist

  • Configure key-pair authentication in Snowflake and store the private key securely; see Snowflake key-pair authentication.
  • Create a TrueFoundry secret with group private-key and key snowflake, and mount it using an FQN such as tfy-secret://truefoundry:private-key:snowflake (see Manage secrets).
  • Enable flyteagent, set agentSecret.secretData.secretData: {} in values, and provide snowflake_private_key: tfy-secret://internal:private-key:snowflake via a kustomize Secret named flyteagent.
  • Install truefoundry[workflow,snowflake] and use SnowflakeTask + SnowflakeTaskConfig for SQL; mount the private key on Python tasks that use Snowflake StructuredDataset URIs.