cybrkyd

Cross posting to X, Bluesky and my Notes from email with Python

 Sat, 14 Jun 2025 13:06 UTC
Cross posting to X, Bluesky and my Notes from email with Python
Image: CC BY 4.0 by cybrkyd

To bringing it all together, I will combine my HTML script, my Bluesky script and my X script into one to enable a three-way cross-post — of a note, post and tweet — all from one e-mail.

The big idea

E-mail. I use it daily and I have it with me at all times. Also, I don’t fancy posting the same thing to three different platforms when I can do it more efficiently.

Plus, I like automation. IFTTT springs to mind and so does Buffer to some extent for their ability to maintain a presence on the socials without the need to logon to each platform.

Cybrkyd doesn’t outsource! So, without further ado, here is my implementation. If you ever see a post on X, Bluesky or my Notes page which points to this article, it was done with the below script (or some version of it).

The script

The one major change from the original HTML script is that this one now writes to a .txt file.

#!/usr/bin/env python3

import requests
from requests_oauthlib import OAuth1
import imaplib
import email
from email.header import decode_header
import os
import re
import shutil
from datetime import datetime
from html.parser import HTMLParser
from urllib.parse import urlparse
from pathlib import Path
from subprocess import run
import mimetypes
import unicodedata
from bs4 import BeautifulSoup

# Email credentials
IMAP_SERVER = 'xxx'
EMAIL_USER = 'xxx'
EMAIL_PASSWORD = 'xxx'

# Twitter Credentials and Endpoints
API_KEY = 'xxx'
API_SECRET_KEY = 'xxx'
ACCESS_TOKEN = 'xxx'
ACCESS_TOKEN_SECRET = 'xxx'

upload_url = "https://upload.twitter.com/1.1/media/upload.json"
tweet_url = "https://api.twitter.com/2/tweets"

# HTML and image file paths
HTML_FILE_PATH = '/absolute/file/path/to/page1.txt'
IMAGE_SAVE_DIR = '/absolute/file/path/to/img-n/'

# Bluesky credentials
BLUESKY_USER = 'xxx'
BLUESKY_PASSWORD = 'xxx'

# Constants
URL_PATTERN = re.compile(r'http[s]?://\S+')

# ==== Twitter and HTML Classes and Functions ====
class MetadataParser(HTMLParser):
    def __init__(self):
        super().__init__()
        self.metadata = {"title": None, "description": None, "thumb": None, "site_name": None}

    def handle_starttag(self, tag, attrs):
        attrs_dict = dict(attrs)
        if tag == 'meta':
            prop = attrs_dict.get('property') or attrs_dict.get('name')
            content = attrs_dict.get('content')
            if prop == 'og:title':
                self.metadata['title'] = content
            elif prop == 'og:description':
                self.metadata['description'] = content
            elif prop == 'og:image':
                self.metadata['thumb'] = content
            elif prop == 'og:site_name':
                self.metadata['site_name'] = content

def fetch_metadata(url):
    try:
        headers = {'User-Agent': 'Mozilla/5.0'}
        r = requests.get(url, headers=headers, timeout=8)
        if r.status_code == 200:
            parser = MetadataParser()
            parser.feed(r.text)
            return parser.metadata if any(parser.metadata.values()) else None
    except Exception:
        pass
    return None

def process_image(url):
    try:
        r = requests.get(url, stream=True, timeout=8)
        if r.status_code == 200:
            image_id = 'cybr' + os.urandom(6).hex()
            ext = Path(urlparse(url).path).suffix
            temp_path = f"/tmp/{image_id}{ext}"
            final_path = os.path.join(IMAGE_SAVE_DIR, f"{image_id}.avif")
            with open(temp_path, 'wb') as f:
                for chunk in r.iter_content(1024):
                    f.write(chunk)
            run(['convert', temp_path, '-quality', '80', final_path])
            os.remove(temp_path)
            return f"{image_id}.avif"
    except Exception:
        pass
    return None

def create_oauth1():
    return OAuth1(API_KEY, API_SECRET_KEY, ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

def upload_media(media_path):
    auth = create_oauth1()
    files = {'media': open(media_path, 'rb')}
    r = requests.post(upload_url, auth=auth, files=files)
    if r.status_code == 200:
        return r.json()['media_id_string']
    return None

def post_tweet_with_media(tweet_content, media_id=None):
    auth = create_oauth1()
    payload = {"text": tweet_content}
    if media_id:
        payload["media"] = {"media_ids": [media_id]}
    return requests.post(tweet_url, auth=auth, json=payload)

def insert_into_html(block):
    try:
        # Read existing content
        with open(HTML_FILE_PATH, 'r') as f:
            existing_content = f.read()
    except FileNotFoundError:
        existing_content = ""

    # Write new content followed by existing content
    with open(HTML_FILE_PATH, 'w') as f:
        f.write(block + "\n" + existing_content)

def append_plain_post(paragraphs, email_date, image_name=None):
    content = '<div class="noted-item">\n'
    for p in paragraphs:
        replaced = p.replace("\n", "<br>\n")
        content += f'<p>{replaced}</p>\n'
    if image_name:
        content += f'<img alt="{image_name}" src="/img-n/{image_name}">\n'
    content += f'<p class="noted-post-time">{email_date.strftime("%a, %d %b %Y %H:%M:%S %z")}</p>\n</div>\n'
    insert_into_html(content)

def append_social_card(user_note, url, title, thumb_filename, email_date, site=None, description=None):
    content = '<div class="noted-item">\n'
    if user_note:
        replaced = user_note.replace("\n", "<br>\n")
        content += f'<p>{replaced}</p>\n'
    content += '<div class="noted-card-wrapper">\n'
    content += f'<a href="{url}" target="_blank" class="noted-card">\n'
    img_src = thumb_filename if thumb_filename.startswith("http") else f"/img-n/{thumb_filename}"
    content += f'<img src="{img_src}" alt="{title or "Link preview"}" class="noted-card-image" />\n'
    if site:
        content += f'<p class="noted-card-url">{site}</p>\n'
    if title:
        content += f'<p class="noted-card-title">Cross posting to X, Bluesky and my Notes from email with Python</p>\n'
    if description:
        desc = description.replace("\n", " ").strip()
        content += f'<p class="noted-card-description">{desc}</p>\n'
    content += '</a>\n</div>\n'
    content += f'<p class="noted-post-time">{email_date.strftime("%a, %d %b %Y %H:%M:%S %z")}</p>\n</div>\n'
    insert_into_html(content)

# ==== Bluesky Functions ====
def generate_facets_from_links_in_text(text):
    facets = []
    for match in URL_PATTERN.finditer(text):
        url = match.group(0).strip().strip('.,!?)]}>"\'')
        start = match.start()
        end = start + len(url)
        facets.append({
            "index": {"byteStart": start, "byteEnd": end},
            "features": [{
                "$type": "app.bsky.richtext.facet#link",
                "uri": url
            }]
        })
    return facets

def upload_image_to_bluesky(image_data, content_type, access_token):
    headers = {
        "Content-Type": content_type,
        "Authorization": f"Bearer {access_token}",
    }
    response = requests.post(
        "https://bsky.social/xrpc/com.atproto.repo.uploadBlob",
        headers=headers,
        data=image_data
    )
    if response.status_code == 200:
        return response.json()["blob"]
    return None

def fetch_embed_url_card(access_token, url):
    card = {"uri": url, "title": "", "description": ""}
    try:
        resp = requests.get(url, timeout=10)
        resp.raise_for_status()
        soup = BeautifulSoup(resp.text, "html.parser")
        card["title"] = soup.title.string.strip() if soup.title and soup.title.string else \
                        soup.find("meta", property="og:title")["content"].strip() if soup.find("meta", property="og:title") else ""
        desc = soup.find("meta", attrs={"name": "description"}) or soup.find("meta", property="og:description")
        card["description"] = desc["content"].strip() if desc and desc.get("content") else ""
        og_img = soup.find("meta", property="og:image")
        if og_img and og_img.get("content"):
            thumb_url = og_img["content"].strip()
            img_resp = requests.get(thumb_url, timeout=10)
            img_resp.raise_for_status()
            content_type = img_resp.headers.get("Content-Type", "image/jpeg")
            blob = upload_image_to_bluesky(img_resp.content, content_type, access_token)
            if blob:
                card["thumb"] = blob
    except:
        return None
    return card

def post_to_bluesky(post_text, images=None):
    auth_payload = {"identifier": BLUESKY_USER, "password": BLUESKY_PASSWORD}
    session = requests.post('https://bsky.social/xrpc/com.atproto.server.createSession', json=auth_payload)
    if session.status_code != 200:
        print("Bluesky auth failed:", session.text)
        return
    data = session.json()
    access_token = data["accessJwt"]
    did = data["did"]

    urls = URL_PATTERN.findall(post_text)
    clean_text = URL_PATTERN.sub('', post_text).strip()
    facets = generate_facets_from_links_in_text(clean_text)
    embed = None

    if urls:
        card = fetch_embed_url_card(access_token, urls[0])
        if card:
            embed = {
                "$type": "app.bsky.embed.external",
                "external": {
                    "uri": card["uri"],
                    "title": card["title"],
                    "description": card["description"],
                    "thumb": card.get("thumb")
                }
            }

    if not embed and images:
        embeds = []
        for img_data, img_type in images:
            blob = upload_image_to_bluesky(img_data, img_type, access_token)
            if blob:
                embeds.append({"alt": "image", "image": blob})
        if embeds:
            embed = {"$type": "app.bsky.embed.images", "images": embeds}

    record = {
        "collection": "app.bsky.feed.post",
        "repo": did,
        "record": {
            "text": clean_text,
            "facets": facets,
            "createdAt": datetime.utcnow().isoformat(timespec='milliseconds') + "Z",
            "$type": "app.bsky.feed.post"
        }
    }

    if embed:
        record["record"]["embed"] = embed

    headers = {"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}
    r = requests.post('https://bsky.social/xrpc/com.atproto.repo.createRecord', json=record, headers=headers)
    print("Bluesky response:", r.status_code, r.text)

# ==== Email Handler ====
def check_email_and_post():
    mail = imaplib.IMAP4_SSL(IMAP_SERVER)
    mail.login(EMAIL_USER, EMAIL_PASSWORD)
    mail.select("inbox")

    status, messages = mail.search(None, '(UNSEEN SUBJECT "tweet.it")')
    email_ids = messages[0].split()

    for email_id in email_ids:
        res, msg = mail.fetch(email_id, "(RFC822)")
        for part in msg:
            if isinstance(part, tuple):
                msg = email.message_from_bytes(part[1])
                subject = decode_header(msg["Subject"])[0][0]
                if isinstance(subject, bytes):
                    subject = subject.decode(errors='replace')

                if subject.lower() == "tweet.it":
                    post_text = ""
                    media_path = None
                    image_name = None
                    images = []
                    email_date = datetime.now()
                    paragraphs = []

                    for part in msg.walk():
                        ctype = part.get_content_type()
                        if ctype == "text/plain":
                            email_date = email.utils.parsedate_to_datetime(msg["Date"])
                            post_text = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8', errors='replace')
                            post_text = unicodedata.normalize("NFKC", post_text.replace('\r', ''))
                            paragraphs = [p.strip() for p in post_text.split('\n\n') if p.strip()]
                        elif ctype.startswith("image/"):
                            filename = part.get_filename()
                            image_data = part.get_payload(decode=True)
                            images.append((image_data, ctype))
                            if filename:
                                image_name = filename
                                media_path = os.path.join("/tmp", filename)
                                with open(media_path, "wb") as f:
                                    f.write(image_data)

                    media_id = None
                    if media_path:
                        media_id = upload_media(media_path)
                        shutil.move(media_path, os.path.join(IMAGE_SAVE_DIR, image_name))

                    # Post to Twitter
                    twitter_response = post_tweet_with_media(post_text, media_id)

                    # Post to Bluesky
                    post_to_bluesky(post_text, images)

                    # Mark as read only if Twitter post succeeds
                    if twitter_response.status_code == 201:
                        mail.store(email_id, '+FLAGS', '\\Seen')
                        urls = URL_PATTERN.findall(post_text)
                        if urls:
                            meta = fetch_metadata(urls[0])
                            desc_text = re.sub(URL_PATTERN, '', post_text).strip()
                            if meta:
                                title = meta.get("title") or "No title"
                                thumb = meta.get("thumb") or "fallback.avif"
                                site = meta.get("site_name")
                                thumb_filename = thumb if thumb.startswith('https://cybrkyd.com') else process_image(thumb)
                                if not thumb_filename:
                                    thumb_filename = "fallback.avif"
                                append_social_card(desc_text, urls[0], title, thumb_filename, email_date, site, meta.get("description"))
                            else:
                                append_plain_post(paragraphs, email_date)
                        else:
                            append_plain_post(paragraphs, email_date, image_name if media_path else None)

    mail.logout()

# Execute
check_email_and_post()

Have fun and remember: always respect API limits. The end.

»
Tagged in: #Python

Visitors: Loading...