-# -*- coding: utf8 -*-
+import datetime
+import hashlib
+import random
+import smtplib
+import subprocess
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
-from django.db import models
-
-from django.template import loader, Context
+import html2text
+import mechanize
+import requests
+from ckeditor.fields import RichTextField
from django.conf import settings
-
-from django.utils.translation import ugettext as _
from django.core.mail import send_mail
-# rajout d'un commentaire inutile
-
-class Subscriber(models.Model) :
- email = models.EmailField(unique = True) # TODO : informer si déjà inscrit ? Que faire dans ce cas.
- inscription_date = models.DateField(auto_now_add=True)
- is_validated = models.NullBooleanField() # Au click sur le lien de confirmation. Null si erreur à l'envoi au souscripteur.
- is_registered = models.NullBooleanField() # À l'inscription après la confirmation Null si erreur à l'envoi à mailman.
- password = models.CharField(max_length=100) # sha1
-
- def __unicode__(self) :
+from django.db import IntegrityError, models
+from django.template import loader
+from django.template.loader import render_to_string
+from django.urls import reverse
+from django.utils.encoding import force_bytes
+from django.utils.safestring import mark_safe
+from django.utils.translation import ugettext
+from django.utils.translation import ugettext_lazy as _
+
+
+class Subscriber(models.Model):
+ email = models.EmailField(unique=False)
+ inscription_date = models.DateTimeField(auto_now_add=True)
+ is_validated = models.NullBooleanField()
+ is_registered = models.NullBooleanField()
+ password = models.CharField(max_length=100)
+ bot_check1 = models.BooleanField(default=False)
+ bot_check2 = models.BooleanField(default=False)
+ user_agent = models.CharField(max_length=1000, blank=True)
+ source_ip = models.CharField(max_length=100, blank=True)
+
+ def __unicode__(self):
return self.email
+ def is_from_bot(self):
+ return bool(self.bot_check1 or self.bot_check2)
- def save(self, *args, **kwargs):
- super(Subscriber, self).save(*args, **kwargs)
- if self.is_validated is None:
- self.send_confirmation_email(args[0])
-
- def send_confirmation_email(self, request):
- subject = _("%s's newsletter registration." % settings.ORGANIZATION)
- confirmation_link = ("%s/newsletter/%s" % (request.get_host(), self.password))
- sender = ("noreplay@%s" % request.get_host().strip("www."))
- organization = settings.ORGANIZATION
- organization_url = request.get_host()
- message = loader.get_template("confirmation_email.txt")
- message_context = Context({
- 'organization' : organization,
- 'organization_url' : organization_url,
- 'confirmation_link' : confirmation_link,
- })
- # Susceptible de lever une socket.error ou une SMTPException
+ def send_confirmation_email(self):
+ if self.is_from_bot():
+ return
+ self.password = hashlib.sha1(force_bytes(str(random.random()))).hexdigest()
+ confirm_subject = loader.get_template('newsletter/confirmation_email_subject.txt')
+ confirm_body = loader.get_template('newsletter/confirmation_email_body.txt')
+ context = {'token': self.password}
send_mail(
- subject,
- message.render(message_context),
- sender,
- [self.email]
+ confirm_subject.render(context).strip(),
+ confirm_body.render(context),
+ settings.NEWSLETTER_SENDER,
+ [self.email],
)
- self.is_validated=False
+ self.is_validated = False
+ self.save()
+
+ def subscribe(self):
+ newsletter_service = getattr(settings, 'NEWSLETTER_SERVICE', 'mailman')
+ if newsletter_service == 'mailman':
+ return self.subscribe_in_mailman()
+ elif newsletter_service == 'mailman3':
+ return self.subscribe_in_mailman3()
+ elif newsletter_service == 'mailchimp':
+ return self.subscribe_in_mailchimp()
+
+ def subscribe_in_mailman(self):
+ try:
+ subprocess.run(['listadmin', '--add-member', self.email, settings.NEWSLETTER_NAME], check=True)
+ except subprocess.CalledProcessError:
+ # maybe an error because email is already registered?
+ result = subprocess.run(
+ ['listadmin', '-l', settings.NEWSLETTER_NAME], check=True, capture_output=True, text=True
+ )
+ if self.email in result.stdout.splitlines():
+ self.is_registered = True
+ self.save()
+ return
+ self.is_registered = True
+ self.save()
+
+ def subscribe_in_mailman3(self):
+ # emulate browser to login and subscribe user
+ br = mechanize.Browser()
+ br.open(settings.NEWSLETTER_MAILMAN3_LIST_URL)
+ br.follow_link(url_regex=re.compile('/accounts/login'))
+ br.select_form(action=lambda x: '/login/' in x)
+ br['login'] = settings.NEWSLETTER_MAILMAN3_LOGIN
+ br['password'] = settings.NEWSLETTER_MAILMAN3_PASSWORD
+ response = br.submit()
+ assert br.find_link(url_regex=re.compile('/accounts/logout'))
+ br.open(settings.NEWSLETTER_MAILMAN3_LIST_URL + 'mass_subscribe/')
+ br.select_form(action=lambda x: '/mass_subscribe/' in x)
+ br['emails'] = self.email
+ br['pre_confirmed'] = ['on']
+ br['pre_approved'] = ['on']
+ br['pre_verified'] = ['on']
+ br['send_welcome_message'] = ['False']
+ response = br.submit()
+ self.is_registered = True
+ self.save()
+
+ def subscribe_in_mailchimp(self):
+ dc = settings.MAILCHIMP_DC
+ apikey = settings.MAILCHIMP_APIKEY
+ list_id = settings.MAILCHIMP_LIST_ID
+ self.email = self.email.lower()
+
+ email_hash = hashlib.md5(self.email.encode()).hexdigest()
+
+ resp = requests.put(
+ f'https://{dc}.api.mailchimp.com/3.0/lists/{list_id}/members/{email_hash}',
+ auth=('key', apikey),
+ json={
+ 'email_address': self.email,
+ 'status_if_new': 'subscribed',
+ },
+ )
+ if resp.ok and resp.json().get('status') == 'subscribed':
+ self.is_registered = True
+ self.save()
+
+
+class Newsletter(models.Model):
+ class Meta:
+ verbose_name = _('Newsletter')
+ verbose_name_plural = _('Newsletters')
+ ordering = ['date']
+
+ subject = models.CharField(_('Title'), max_length=50)
+ date = models.DateField(_('Date'))
+ text = RichTextField(_('Text'), null=True, blank=True)
+
+ expedition_datetime = models.DateTimeField(_('Expedition Date/time'), null=True, blank=True)
+
+ def send(self):
+ msg = MIMEMultipart('alternative')
+ msg['Subject'] = self.subject
+ msg['From'] = settings.NEWSLETTER_SENDER
+ msg['To'] = '%s@%s' % (settings.NEWSLETTER_NAME, settings.NEWSLETTER_DOMAIN)
+
+ h2t = html2text.HTML2Text()
+ h2t.unicode_snob = True
+ context = {
+ 'text_part': mark_safe(h2t.handle(self.text)),
+ 'html_part': mark_safe(self.text),
+ }
+
+ part1 = MIMEText(render_to_string('newsletter/email_body.txt', context), 'plain', _charset='utf-8')
+ part2 = MIMEText(render_to_string('newsletter/email_body.html', context), 'html', _charset='utf-8')
+
+ msg.attach(part1)
+ msg.attach(part2)
+
+ s = smtplib.SMTP('localhost')
+ s.sendmail(msg['From'], msg['To'], msg.as_string())
+ s.quit()
+
+ self.expedition_datetime = datetime.datetime.now()
self.save()
+ def get_absolute_url(self):
+ return reverse('newsletter-view', kwargs={'pk': self.id})