This is my proof of concept for sending a post to X (Twitter) via an e-mail with Python. The script checks for unread e-mails with the subject “tweet.it”, extracts the content, and sends a post to X with proper formatting and image attachments (if present). Note that it should only handle one e-mail at a time.
The script can be automated to run at set intervals via a CRON job. In this case, it can be set to check e-mails, say, every 15 minutes.
Workflow
- Checks for new “tweet.it” emails
- Extracts text content (tweet body)
- Saves any image attachments temporarily
- Uploads images to Twitter if present
- Posts combined tweet (text + media)
- Marks processed emails as read
The script
#!/usr/bin/env python3
import requests
from requests_oauthlib import OAuth1
import imaplib
import email
from email.header import decode_header
import os
from datetime import datetime
# Twitter API credentials
API_KEY = 'xxx'
API_SECRET_KEY = 'xxx'
ACCESS_TOKEN = 'xxx'
ACCESS_TOKEN_SECRET = 'xxx'
# Email credentials
IMAP_SERVER = 'xxx'
EMAIL_USER = 'xxx'
EMAIL_PASSWORD = 'xxx'
# Endpoint URLs
upload_url = "https://upload.twitter.com/1.1/media/upload.json"
tweet_url = "https://api.twitter.com/2/tweets"
def create_oauth1():
return OAuth1(API_KEY, API_SECRET_KEY, ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
def upload_media(media_path):
auth = create_oauth1()
files = {'media': open(media_path, 'rb')}
r = requests.post(upload_url, auth=auth, files=files)
if r.status_code == 200:
return r.json()['media_id_string']
return None
def post_tweet_with_media(tweet_content, media_id=None):
auth = create_oauth1()
payload = {"text": tweet_content}
if media_id:
payload["media"] = {"media_ids": [media_id]}
return requests.post(tweet_url, auth=auth, json=payload)
def mark_email_as_read(mail, email_id):
mail.store(email_id, '+FLAGS', '\Seen')
def check_email_and_tweet():
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(EMAIL_USER, EMAIL_PASSWORD)
mail.select("inbox")
status, messages = mail.search(None, '(UNSEEN SUBJECT "tweet.it")')
email_ids = messages[0].split()
for email_id in email_ids:
res, msg = mail.fetch(email_id, "(RFC822)")
for response_part in msg:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])
subject, encoding = decode_header(msg["Subject"])[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding)
if subject.lower() == "tweet.it":
tweet_text = ""
media_path = None
for part in msg.walk():
if part.get_content_type() == "text/plain":
tweet_text = part.get_payload(decode=True).decode(part.get_content_charset())
elif part.get_content_type().startswith("image"):
filename = part.get_filename()
if filename:
media_path = os.path.join("/tmp", filename)
with open(media_path, "wb") as f:
f.write(part.get_payload(decode=True))
media_id = None
if media_path:
media_id = upload_media(media_path)
os.remove(media_path)
response = post_tweet_with_media(tweet_text, media_id)
if response.status_code == 201:
mark_email_as_read(mail, email_id)
mail.logout()
check_email_and_tweet()
Easy-peasy! The end.
»
Visitors: Loading...