This is my proof of concept for sending a post to Bluesky via an e-mail with Python. The script checks for unread e-mails with the subject “bluesky.it”, extracts the content, and sends a post to Bluesky with proper formatting, link handling, and image attachments. Note that it should only handle one e-mail at a time.
As always, scripts can be automated to run at set intervals via a CRON job. In this case, it can be set to check e-mails, say, every 15 minutes.
As of now, it does not appear that IFTTT can do the below; their Bluesky offerings are a tad bit slim. I accept that working with Bluesky’s API is extremely challenging (for me; maybe for them, too) and this script was only fully completed yesterday, after months of on-and-off tinkering to iron out my bugs.
Workflow
- User sends themselves an e-mail with subject “bluesky.it”
- For each matching e-mail:
- Extract plain text content
- Gather any image attachments
- Processes the content:
- Clean and normalise text
- Extract and handle URLs
- Upload images
- Post to Bluesky with:
- Formatted text
- URL embeds (when links are present)
- Image attachments (when available)
Key Features
-
E-mail Processing:
- Connects to an IMAP e-mail server
- Searches for unread e-mails with specific subject
- Extracts both text and image attachments
-
Bluesky Integration:
- Authenticates with Bluesky API
- Handles text posts with proper formatting
- Processes embedded links with rich preview cards
- Uploads and attaches images
-
URL Handling:
- Extracts URLs from text
- Generates link facets for proper URL display
- Creates rich embed cards with metadata (title, description, thumbnail)
-
Image Processing:
- Supports multiple image attachments
- Uploads images to Bluesky’s storage
- Properly formats image embeds
The script
The Bluesky DID is retrieved as part of the operation, so the only credentials required are:
- BLUESKY_USER
- BLUESKY_PASSWORD
Note that the Python module atproto is not used.
#!/usr/bin/env python3
import requests
import imaplib
import email
from email.header import decode_header
import os
import re
import unicodedata
from datetime import datetime
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import mimetypes
# Email credentials
IMAP_SERVER = 'xxx'
EMAIL_USER = 'xxx'
EMAIL_PASSWORD = 'xxx'
# Bluesky credentials
BLUESKY_USER = 'xxx'
BLUESKY_PASSWORD = 'xxx'
# Regex to find URLs
URL_PATTERN = re.compile(r'http[s]?://\S+')
def generate_facets_from_links_in_text(text):
facets = []
# Find all URLs in the provided text
for match in URL_PATTERN.finditer(text):
url = match.group(0).strip().strip('.,!?)]}>"\'')
start = match.start()
end = start + len(url)
facets.append({
"index": {"byteStart": start, "byteEnd": end},
"features": [{
"$type": "app.bsky.richtext.facet#link",
"uri": url
}]
})
return facets
def upload_image_to_bluesky(image_data, content_type, access_token):
# Prepare headers for the image upload request
headers = {
"Content-Type": content_type,
"Authorization": f"Bearer {access_token}",
}
# Send a POST request to upload the image
response = requests.post(
"https://bsky.social/xrpc/com.atproto.repo.uploadBlob",
headers=headers,
data=image_data
)
# Check if the upload was successful
if response.status_code == 200:
return response.json()["blob"]
else:
print("Failed to upload image to Bluesky:", response.text)
return None
# Initialize a card structure to hold URL metadata
def fetch_embed_url_card(access_token, url):
card = {
"uri": url,
"title": "",
"description": "",
}
# Fetch the content of the URL
try:
resp = requests.get(url, timeout=10)
resp.raise_for_status()
except Exception as e:
print(f"Failed to fetch URL: {url} - {e}")
return None
# Parse the HTML content of the page
soup = BeautifulSoup(resp.text, "html.parser")
# Extract title
title = soup.title.string.strip() if soup.title and soup.title.string else ""
if not title:
og_title = soup.find("meta", property="og:title")
title = og_title["content"].strip() if og_title and og_title.get("content") else ""
card["title"] = title
# Extract description
description = ""
meta_desc = soup.find("meta", attrs={"name": "description"})
if meta_desc and meta_desc.get("content"):
description = meta_desc["content"].strip()
else:
og_desc = soup.find("meta", property="og:description")
if og_desc and og_desc.get("content"):
description = og_desc["content"].strip()
card["description"] = description
# Extract thumbnail
thumb_url = ""
og_image = soup.find("meta", property="og:image")
if og_image and og_image.get("content"):
thumb_url = og_image["content"].strip()
# If a thumbnail URL is found, fetch the image
if thumb_url:
try:
thumb_resp = requests.get(thumb_url, timeout=10)
thumb_resp.raise_for_status()
content_type = thumb_resp.headers.get("Content-Type", "")
if not content_type:
ext = os.path.splitext(urlparse(thumb_url).path)[1]
content_type = mimetypes.types_map.get(ext.lower(), "image/jpeg")
blob = upload_image_to_bluesky(thumb_resp.content, content_type, access_token)
if blob:
card["thumb"] = blob
except Exception as e:
print(f"Failed to fetch thumbnail: {thumb_url} - {e}")
return card
def post_to_bluesky(post_text, images=None):
auth_payload = {
"identifier": BLUESKY_USER,
"password": BLUESKY_PASSWORD
}
headers = {"Content-Type": "application/json"}
# Send a POST request to create a session with Bluesky
session_response = requests.post(
'https://bsky.social/xrpc/com.atproto.server.createSession',
json=auth_payload, headers=headers
)
if session_response.status_code != 200:
print("Session error:", session_response.text)
return
data = session_response.json()
access_jwt = data.get("accessJwt")
did = data.get("did")
# Find all URLs in post text
urls = URL_PATTERN.findall(post_text)
# Remove all URLs from post body for Bluesky
cleaned_post_text = URL_PATTERN.sub('', post_text).strip()
facets = generate_facets_from_links_in_text(cleaned_post_text)
embed = None
# If there's a URL, fetch card info and generate embed
if urls:
card = fetch_embed_url_card(access_jwt, urls[0]) # Only process the first URL
if card:
embed = {
"$type": "app.bsky.embed.external",
"external": {
"uri": card["uri"],
"title": card["title"],
"description": card["description"],
"thumb": card.get("thumb")
}
}
# If no embed from URL, check for image attachments
if not embed and images:
image_embeds = []
for img_data, img_type in images:
# Upload each image to Bluesky and get the blob data
blob = upload_image_to_bluesky(img_data, img_type, access_jwt)
if blob:
image_embeds.append({
"alt": "image",
"image": blob
})
if image_embeds:
embed = {
"$type": "app.bsky.embed.images",
"images": image_embeds
}
# Prepare the payload for posting the record to Bluesky
record_payload = {
"collection": "app.bsky.feed.post",
"repo": did,
"record": {
"text": cleaned_post_text,
"facets": facets,
"createdAt": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + "Z",
"$type": "app.bsky.feed.post"
}
}
if embed:
record_payload["record"]["embed"] = embed
record_headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_jwt}"
}
post_response = requests.post(
'https://bsky.social/xrpc/com.atproto.repo.createRecord',
json=record_payload,
headers=record_headers
)
if post_response.status_code == 200:
print("Bluesky post successful.")
else:
print("Bluesky post failed:", post_response.text)
# ==== Email Handler ====
def check_email_and_post_to_bluesky():
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(EMAIL_USER, EMAIL_PASSWORD)
mail.select("inbox")
status, messages = mail.search(None, '(UNSEEN SUBJECT "bluesky.it")')
email_ids = messages[0].split()
for email_id in email_ids:
res, msg_data = mail.fetch(email_id, "(RFC822)")
for part in msg_data:
if isinstance(part, tuple):
msg = email.message_from_bytes(part[1])
subject, encoding = decode_header(msg["Subject"])[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding or 'utf-8', errors='replace')
if subject.lower() == "bluesky.it":
post_text = ""
images = []
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
charset = part.get_content_charset() or 'utf-8'
post_text = part.get_payload(decode=True).decode(charset, errors='replace')
post_text = post_text.replace('\r', '').replace('\uFFFD', '').strip()
post_text = unicodedata.normalize("NFKC", post_text)
elif content_type.startswith("image/"):
image_data = part.get_payload(decode=True)
images.append((image_data, content_type))
if post_text:
post_to_bluesky(post_text, images)
mail.store(email_id, '+FLAGS', '\\Seen')
mail.logout()
# Check emails and post to Bluesky
check_email_and_post_to_bluesky()
The end.
