Skip to content

Curated examples and patterns for using Chalk. Use these to build your feature pipelines.

License

Notifications You must be signed in to change notification settings

chalk-ai/examples

Repository files navigation

Examples

Curated examples and patterns for using Chalk. Use these to build your feature pipelines.

Declare and version features from Python.

@features(owner="librarians@sfpl.org")
class Author:
    id: str
    # The Author's email
    # :tags: pii
    name: str
    nickname: str | None
    books: DataFrame[Book] = has_many(lambda: Book.author_id == Author.id)

Resolvers are Python functions that compute your feature values. They take as input features that they need to know to run, and resolve the values of one or more features. In both cases, you use Python type annotations to define these dependencies and outputs.

@realtime
def get_email_domain(email: User.email) -> User.email_domain:
    return email.split("@")[1].lower()

@realtime
def is_banned_email(domain: User.email_domain) -> User.banned_email:
    return domain in {"pergi.id", ...}

https://docs.chalk.ai/docs/resolver-overview

When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver.

https://docs.chalk.ai/docs/feature-caching

@features
class User:
    fico_score: int = feature(max_staleness="30d")

ChalkClient().query(
    input={...},
    output=[User.fico_score],
    staleness={User.fico_score: "10m"},
)

https://docs.chalk.ai/docs/feature-caching

Run resolvers on a schedule, sampling values for the inputs.

def only_active_filter(last_login: User.last_login, status: User.status) -> bool:
    return status == "active" and last_login > datetime.now() - timedelta(days=30)

@realtime(cron=Cron(schedule="29d 11h", filter=only_active_filter))
def get_fico_score(name: User.name, email: User.email) -> User.fico_score:
    return requests.get("https://experian.com").json()["score"]

https://docs.chalk.ai/docs/resolver-cron

Capture metadata to inform alerting, monitoring, and discovery.

@features(owner="shuttle@nasa.gov", tags="group:rocketry")
class SpaceShuttle:
    id: str

    # The SHA1 of the software deployed to the shuttle.
    # Should align with a git commit on main.
    #
    # :owner: katherine.johnson@nasa.gov
    software_version: str

    # The volume of this shuttle in square meters.
    # :owner: architecture@nasa.gov
    # :tags: zillow-fact, size
    volume: str

https://docs.chalk.ai/docs/feature-discovery

A Chalk DataFrame is a 2-dimensional data structure similar to pandas.Dataframe, but with richer types and underlying optimizations.

User.txns[
    Transaction.amount < 0,
    Transaction.merchant in {"uber", "lyft"} or Transaction.memo == "uberpmts",
    Transaction.canceled_at is None,
    Transaction.amount
].sum()

https://docs.chalk.ai/docs/dataframe

Chalk gives a simple way to express streaming computations, for both mapping stream and window aggregate streams.

@stream(...)
def failed_logins(events: DataFrame[LoginMessage]) -> DataFrame[
    User.id,
    User.num_failed_logins
]:
    return f"""
        select
          user_id as id,
          count(*) as num_failed_logins
        from {events}
        where failed = 1
        group by 1
    """

https://docs.chalk.ai/docs/streams

https://docs.chalk.ai/docs/aggregations

Test your Chalk features and resolvers.

@realtime
def get_home_data(
        hid: HomeFeatures.id,
) -> Features[HomeFeatures.price, HomeFeatures.sq_ft]:
    return HomeFeatures(price=200_000, sq_ft=2_000)

def test_multiple_output():
    assert get_home_data(2) == HomeFeatures(
        price=200_000,
        sq_ft=2_000,
    )

https://docs.chalk.ai/docs/unit-tests

Deploy feature pipelines in GitHub Actions.

Docs: https://docs.chalk.ai/docs/github-actions

CLI Step: https://github.com/chalk-ai/cli-action

Deploy Step: https://github.com/chalk-ai/deploy-action

- uses: chalk-ai/deploy-action@v2
  with:
    client-id: ${{secrets.CHALK_CLIENT_ID}}
    client-secret: ${{secrets.CHALK_CLIENT_SECRET}}
    await: true
    no-promote: true

Docs on migrations coming soon!

Chalk can ingest your data using a SQL interface from any of the integrations that support it. You can describe your queries using SQL strings or SQLAlchemy.

@realtime
def get_views() -> DataFrame[User]:
    return db.query_string(
        """
        select id, sum(mins) as viewed_minutes
        from view_counts
        group by id
        """,
    ).all()

https://docs.chalk.ai/docs/sql

Chalk can help you build insight into the financial transactions of your users.

@realtime
def tradeline_rollup(
        accounts: User.tradelines[
            Tradeline.is_delinquent is True
            ]
) -> User.delinquent_amount:
    return accounts[Tradeline.outstanding].sum()

http://chalk.ai/solutions/credit

Finding a balance between user experience and risk management is a complex task for banking products. Chalk helps you express complex business logic with features and resolvers, and lets data scientists and machine learning engineers collaborate on solutions.

@realtime(when=TransferLimit.to_account.is_internal is False)
def withdrawal_limit(
        internal_accounts: TransferLimit.user.accounts[Account.is_internal is True],
        deposits_last_90: TransferLimit.user.transfers[Transfer.from_account.is_internal is False, before(days_ago=90)],
        user_settlement: TransferLimit.user.holdback,
) -> TransferLimit.amount:
    ...

@stream(...)
def agg_logins(df: DataFrame[LoginMessage]) -> DataFrame[User]:
    return f"""
        select
            count(*) as failed_logins,
            user_id as id
        from {df}
        where status = 'failed'
        group by id
    """

http://chalk.ai/solutions/fraud

Predicting device failure requires complex analysis executed against a variety of data sources. Chalk's platform allows data scientists to bring all the different data together, including streaming data from devices.

@stream(...)
def process_measurements(df: DataFrame[Message]) -> DataFrame[Sensor]:
    return f"""
        select
            count(*) as count_failed,
            id as device_id
        from {df}
        where is_failing <> TRUE
        group by id
    """

http://chalk.ai/solutions/maintenance

About

Curated examples and patterns for using Chalk. Use these to build your feature pipelines.

Topics

Resources

License

Security policy

Stars

Watchers

Forks

Languages