Skip to main content

Starlark Script Examples

This page provides complete, working examples of Starlark scripts for common use cases.

Weather API Script

Fetch current weather data from WeatherAPI.com.

# @description: Get current weather for a city
# @author: OpenPact Team
# @version: 1.0.0
# @secrets: WEATHER_API_KEY

def get_weather(city):
"""
Fetch current weather conditions for a city.

Args:
city: City name (e.g., "London") or coordinates (e.g., "51.5,-0.1")

Returns:
Dict with weather data or error message
"""
api_key = secrets.get("WEATHER_API_KEY")
if not api_key:
return {"error": "WEATHER_API_KEY not configured"}

url = format(
"https://api.weatherapi.com/v1/current.json?key=%s&q=%s&aqi=no",
api_key,
city
)

resp = http.get(url)
if resp["status"] != 200:
return {
"error": "Weather API request failed",
"status": resp["status"]
}

data = json.decode(resp["body"])

return {
"city": data["location"]["name"],
"region": data["location"]["region"],
"country": data["location"]["country"],
"local_time": data["location"]["localtime"],
"temp_c": data["current"]["temp_c"],
"temp_f": data["current"]["temp_f"],
"feels_like_c": data["current"]["feelslike_c"],
"feels_like_f": data["current"]["feelslike_f"],
"condition": data["current"]["condition"]["text"],
"humidity": data["current"]["humidity"],
"wind_kph": data["current"]["wind_kph"],
"wind_mph": data["current"]["wind_mph"],
"wind_dir": data["current"]["wind_dir"],
"uv_index": data["current"]["uv"]
}

def get_forecast(city, days):
"""
Fetch weather forecast for a city.

Args:
city: City name
days: Number of days (1-10)

Returns:
Dict with forecast data
"""
api_key = secrets.get("WEATHER_API_KEY")
if not api_key:
return {"error": "WEATHER_API_KEY not configured"}

if days < 1 or days > 10:
return {"error": "Days must be between 1 and 10"}

url = format(
"https://api.weatherapi.com/v1/forecast.json?key=%s&q=%s&days=%d&aqi=no",
api_key,
city,
days
)

resp = http.get(url)
if resp["status"] != 200:
return {"error": "Forecast API request failed", "status": resp["status"]}

data = json.decode(resp["body"])
forecasts = []

for day in data["forecast"]["forecastday"]:
forecasts.append({
"date": day["date"],
"max_temp_c": day["day"]["maxtemp_c"],
"min_temp_c": day["day"]["mintemp_c"],
"condition": day["day"]["condition"]["text"],
"chance_of_rain": day["day"]["daily_chance_of_rain"],
"chance_of_snow": day["day"]["daily_chance_of_snow"]
})

return {
"city": data["location"]["name"],
"country": data["location"]["country"],
"forecast": forecasts
}

# Default execution
result = get_weather("London")

Usage:

Tool: script_run
Args: {"name": "weather", "function": "get_weather", "args": ["Tokyo"]}

Stock Price Script

Fetch stock quotes from Alpha Vantage.

# @description: Get stock price and market data
# @author: OpenPact Team
# @version: 1.0.0
# @secrets: ALPHA_VANTAGE_API_KEY

def get_quote(symbol):
"""
Get current stock quote for a symbol.

Args:
symbol: Stock ticker symbol (e.g., "AAPL", "GOOGL")

Returns:
Dict with stock data or error message
"""
api_key = secrets.get("ALPHA_VANTAGE_API_KEY")
if not api_key:
return {"error": "ALPHA_VANTAGE_API_KEY not configured"}

# Validate symbol
symbol = symbol.upper().strip()
if not symbol or len(symbol) > 10:
return {"error": "Invalid stock symbol"}

url = format(
"https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol=%s&apikey=%s",
symbol,
api_key
)

resp = http.get(url)
if resp["status"] != 200:
return {"error": "API request failed", "status": resp["status"]}

data = json.decode(resp["body"])

if "Global Quote" not in data or not data["Global Quote"]:
return {"error": "Symbol not found or API limit reached", "symbol": symbol}

quote = data["Global Quote"]

return {
"symbol": quote["01. symbol"],
"price": float(quote["05. price"]),
"open": float(quote["02. open"]),
"high": float(quote["03. high"]),
"low": float(quote["04. low"]),
"volume": int(quote["06. volume"]),
"previous_close": float(quote["08. previous close"]),
"change": float(quote["09. change"]),
"change_percent": quote["10. change percent"],
"latest_trading_day": quote["07. latest trading day"],
"fetched_at": time.now()
}

def get_multiple_quotes(symbols):
"""
Get quotes for multiple symbols.

Args:
symbols: List of stock ticker symbols

Returns:
Dict with quotes for each symbol
"""
results = {}

for symbol in symbols:
# Add small delay between requests to avoid rate limiting
if len(results) > 0:
time.sleep(1)

results[symbol] = get_quote(symbol)

return {"quotes": results, "count": len(results)}

# Default execution
result = get_quote("AAPL")

Usage:

Tool: script_run
Args: {"name": "stocks", "function": "get_quote", "args": ["MSFT"]}

Notification Script

Send notifications via webhooks (Slack, Discord, etc.).

# @description: Send notifications to various platforms
# @author: OpenPact Team
# @version: 1.0.0
# @secrets: SLACK_WEBHOOK_URL, DISCORD_WEBHOOK_URL

def send_slack(message, channel=None):
"""
Send a message to Slack via webhook.

Args:
message: The message text to send
channel: Optional channel override

Returns:
Dict with success status
"""
webhook_url = secrets.get("SLACK_WEBHOOK_URL")
if not webhook_url:
return {"error": "SLACK_WEBHOOK_URL not configured"}

payload = {"text": message}
if channel:
payload["channel"] = channel

resp = http.post(
webhook_url,
body=json.encode(payload),
headers={"Content-Type": "application/json"}
)

if resp["status"] != 200:
return {
"success": False,
"error": "Slack webhook failed",
"status": resp["status"]
}

return {
"success": True,
"platform": "slack",
"sent_at": time.now()
}

def send_discord(message, username=None):
"""
Send a message to Discord via webhook.

Args:
message: The message content to send
username: Optional username override for the webhook

Returns:
Dict with success status
"""
webhook_url = secrets.get("DISCORD_WEBHOOK_URL")
if not webhook_url:
return {"error": "DISCORD_WEBHOOK_URL not configured"}

payload = {"content": message}
if username:
payload["username"] = username

resp = http.post(
webhook_url,
body=json.encode(payload),
headers={"Content-Type": "application/json"}
)

# Discord returns 204 No Content on success
if resp["status"] not in [200, 204]:
return {
"success": False,
"error": "Discord webhook failed",
"status": resp["status"]
}

return {
"success": True,
"platform": "discord",
"sent_at": time.now()
}

def send_generic_webhook(url, data, headers=None):
"""
Send data to a generic webhook URL.

Args:
url: The webhook URL (must be https)
data: Dict of data to send as JSON
headers: Optional additional headers

Returns:
Dict with response status
"""
if not url.startswith("https://"):
return {"error": "Webhook URL must use HTTPS"}

request_headers = {"Content-Type": "application/json"}
if headers:
for key, value in headers.items():
request_headers[key] = value

resp = http.post(
url,
body=json.encode(data),
headers=request_headers
)

return {
"success": resp["status"] >= 200 and resp["status"] < 300,
"status": resp["status"],
"sent_at": time.now()
}

# Default execution - test Slack
result = send_slack("Test notification from OpenPact")

Usage:

Tool: script_run
Args: {"name": "notification", "function": "send_slack", "args": ["Server deployment complete!"]}

Data Transformation Script

Transform and aggregate data from multiple sources.

# @description: Data transformation utilities
# @author: OpenPact Team
# @version: 1.0.0
# @secrets: none

def parse_csv(csv_string, has_header=True):
"""
Parse a CSV string into a list of dicts or lists.

Args:
csv_string: The CSV data as a string
has_header: Whether the first row is a header

Returns:
List of dicts (if has_header) or list of lists
"""
lines = csv_string.strip().split("\n")
if not lines:
return {"error": "Empty CSV data"}

result = []
header = None

for i, line in enumerate(lines):
# Simple CSV parsing (doesn't handle quoted commas)
values = [v.strip() for v in line.split(",")]

if i == 0 and has_header:
header = values
elif header:
row = {}
for j, key in enumerate(header):
if j < len(values):
row[key] = values[j]
else:
row[key] = ""
result.append(row)
else:
result.append(values)

return {"data": result, "row_count": len(result)}

def aggregate_numbers(data, key):
"""
Calculate statistics for a numeric field.

Args:
data: List of dicts
key: The key to aggregate

Returns:
Dict with sum, avg, min, max, count
"""
values = []
for item in data:
if key in item:
try:
values.append(float(item[key]))
except:
pass

if not values:
return {"error": "No numeric values found for key: " + key}

total = sum(values)
count = len(values)

return {
"key": key,
"sum": total,
"avg": total / count,
"min": min(values),
"max": max(values),
"count": count
}

def filter_data(data, field, operator, value):
"""
Filter a list of dicts based on a condition.

Args:
data: List of dicts to filter
field: Field name to check
operator: Comparison operator (eq, ne, gt, lt, gte, lte, contains)
value: Value to compare against

Returns:
Filtered list
"""
result = []

for item in data:
if field not in item:
continue

item_value = item[field]

match = False
if operator == "eq":
match = item_value == value
elif operator == "ne":
match = item_value != value
elif operator == "gt":
match = float(item_value) > float(value)
elif operator == "lt":
match = float(item_value) < float(value)
elif operator == "gte":
match = float(item_value) >= float(value)
elif operator == "lte":
match = float(item_value) <= float(value)
elif operator == "contains":
match = str(value) in str(item_value)

if match:
result.append(item)

return {"data": result, "count": len(result)}

def group_by(data, key):
"""
Group data by a field value.

Args:
data: List of dicts
key: Field to group by

Returns:
Dict with groups
"""
groups = {}

for item in data:
if key not in item:
continue

group_key = str(item[key])
if group_key not in groups:
groups[group_key] = []
groups[group_key].append(item)

return {
"groups": groups,
"group_count": len(groups)
}

# Example usage
sample_data = [
{"name": "Alice", "department": "Engineering", "salary": 100000},
{"name": "Bob", "department": "Engineering", "salary": 95000},
{"name": "Charlie", "department": "Sales", "salary": 85000},
{"name": "Diana", "department": "Sales", "salary": 90000}
]

result = group_by(sample_data, "department")

Usage:

Tool: script_run
Args: {
"name": "transform",
"function": "aggregate_numbers",
"args": [[{"value": "10"}, {"value": "20"}, {"value": "30"}], "value"]
}

Currency Conversion Script

Convert between currencies using exchange rate API.

# @description: Currency conversion using exchange rates
# @author: OpenPact Team
# @version: 1.0.0
# @secrets: EXCHANGE_RATE_API_KEY

def get_rate(from_currency, to_currency):
"""
Get the current exchange rate between two currencies.

Args:
from_currency: Source currency code (e.g., "USD")
to_currency: Target currency code (e.g., "EUR")

Returns:
Dict with exchange rate
"""
api_key = secrets.get("EXCHANGE_RATE_API_KEY")
if not api_key:
return {"error": "EXCHANGE_RATE_API_KEY not configured"}

from_currency = from_currency.upper().strip()
to_currency = to_currency.upper().strip()

url = format(
"https://v6.exchangerate-api.com/v6/%s/pair/%s/%s",
api_key,
from_currency,
to_currency
)

resp = http.get(url)
if resp["status"] != 200:
return {"error": "Exchange rate API failed", "status": resp["status"]}

data = json.decode(resp["body"])

if data["result"] != "success":
return {"error": "Currency conversion failed", "details": data}

return {
"from": from_currency,
"to": to_currency,
"rate": data["conversion_rate"],
"updated_at": data["time_last_update_utc"]
}

def convert(amount, from_currency, to_currency):
"""
Convert an amount from one currency to another.

Args:
amount: Amount to convert
from_currency: Source currency code
to_currency: Target currency code

Returns:
Dict with conversion result
"""
rate_result = get_rate(from_currency, to_currency)

if "error" in rate_result:
return rate_result

converted = float(amount) * rate_result["rate"]

return {
"original_amount": amount,
"from_currency": from_currency,
"to_currency": to_currency,
"rate": rate_result["rate"],
"converted_amount": round(converted, 2),
"formatted": format("%s %.2f = %s %.2f",
from_currency, amount,
to_currency, converted
)
}

# Default execution
result = convert(100, "USD", "EUR")

Usage:

Tool: script_run
Args: {"name": "currency", "function": "convert", "args": [100, "USD", "GBP"]}

API Health Check Script

Monitor the health of external APIs.

# @description: Check health status of external APIs
# @author: OpenPact Team
# @version: 1.0.0
# @secrets: none

def check_endpoint(url, expected_status=200, timeout_threshold_ms=5000):
"""
Check if an API endpoint is responding correctly.

Args:
url: URL to check
expected_status: Expected HTTP status code
timeout_threshold_ms: Response time threshold in milliseconds

Returns:
Dict with health status
"""
start_time = time.now()

try:
resp = http.get(url)
end_time = time.now()

healthy = resp["status"] == expected_status

return {
"url": url,
"healthy": healthy,
"status_code": resp["status"],
"expected_status": expected_status,
"checked_at": end_time
}
except:
return {
"url": url,
"healthy": False,
"error": "Request failed",
"checked_at": time.now()
}

def check_multiple(endpoints):
"""
Check health of multiple endpoints.

Args:
endpoints: List of dicts with 'name' and 'url' keys

Returns:
Dict with overall status and individual results
"""
results = []
all_healthy = True

for endpoint in endpoints:
name = endpoint.get("name", endpoint["url"])
url = endpoint["url"]
expected = endpoint.get("expected_status", 200)

result = check_endpoint(url, expected)
result["name"] = name

results.append(result)

if not result["healthy"]:
all_healthy = False

# Small delay between checks
if len(results) < len(endpoints):
time.sleep(0.5)

healthy_count = len([r for r in results if r["healthy"]])

return {
"all_healthy": all_healthy,
"healthy_count": healthy_count,
"total_count": len(results),
"results": results,
"checked_at": time.now()
}

# Example: Check common public APIs
endpoints = [
{"name": "GitHub API", "url": "https://api.github.com"},
{"name": "JSONPlaceholder", "url": "https://jsonplaceholder.typicode.com/posts/1"},
{"name": "HTTPBin", "url": "https://httpbin.org/get"}
]

result = check_multiple(endpoints)

Usage:

Tool: script_run
Args: {
"name": "health",
"function": "check_multiple",
"args": [[
{"name": "My API", "url": "https://api.myservice.com/health"},
{"name": "Database", "url": "https://db.myservice.com/status"}
]]
}

Tips for Writing Scripts

1. Always Handle Errors

def safe_api_call(url):
resp = http.get(url)
if resp["status"] != 200:
return {"error": "API failed", "status": resp["status"]}
return json.decode(resp["body"])

2. Validate Inputs

def process(value):
if not value:
return {"error": "Value is required"}
if type(value) != "string":
return {"error": "Value must be a string"}
# Process...

3. Use Descriptive Metadata

# @description: Clear description of what this does
# @author: Your name
# @version: 1.0.0
# @secrets: LIST, OF, SECRETS

4. Return Structured Data

# Good
return {"temperature": 20, "unit": "celsius", "city": "London"}

# Not ideal
return "The temperature in London is 20 celsius"

5. Include Timestamps

return {
"data": result,
"fetched_at": time.now()
}